This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 64104533acf Enable querying entirely cold datasources (#16676)
64104533acf is described below
commit 64104533acf744bf3b596a9770beb17aedc374df
Author: Rishabh Singh <[email protected]>
AuthorDate: Mon Jul 15 15:02:59 2024 +0530
Enable querying entirely cold datasources (#16676)
Add ability to query entirely cold datasources.
---
.../client/coordinator/CoordinatorClient.java | 5 +
.../client/coordinator/CoordinatorClientImpl.java | 13 +
.../metadata/AbstractSegmentMetadataCache.java | 16 +-
.../metadata/CoordinatorSegmentMetadataCache.java | 220 +++++++++-
.../druid/server/coordinator/DruidCoordinator.java | 3 +
.../client/coordinator/NoopCoordinatorClient.java | 6 +
...CoordinatorSegmentDataCacheConcurrencyTest.java | 23 +-
.../CoordinatorSegmentMetadataCacheTest.java | 479 ++++++++++++++++++++-
.../calcite/schema/BrokerSegmentMetadataCache.java | 41 +-
.../schema/BrokerSegmentMetadataCacheTest.java | 89 +++-
10 files changed, 853 insertions(+), 42 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index fdf16b2ac50..edeb16665ba 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -69,4 +69,9 @@ public interface CoordinatorClient
* Returns a new instance backed by a ServiceClient which follows the
provided retryPolicy
*/
CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy);
+
+ /**
+ * Retrieves list of datasources with used segments.
+ */
+ ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments();
}
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
index 4c795c9dbd4..fc3deee12ed 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
@@ -188,4 +188,17 @@ public class CoordinatorClientImpl implements
CoordinatorClient
{
return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy),
jsonMapper);
}
+
+ @Override
+ public ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments()
+ {
+ final String path = "/druid/coordinator/v1/metadata/datasources";
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, path),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new
TypeReference<Set<String>>() {})
+ );
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index 9cb2297db82..88e6ee97b98 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -200,7 +200,7 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
* Map of datasource and generic object extending DataSourceInformation.
* This structure can be accessed by {@link #cacheExec} and {@link
#callbackExec} threads.
*/
- protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<String, T> tables = new
ConcurrentHashMap<>();
/**
* This lock coordinates the access from multiple threads to those variables
guarded by this lock.
@@ -269,9 +269,10 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
final boolean wasRecentFailure = DateTimes.utc(lastFailure)
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();
+
if (isServerViewInitialized &&
!wasRecentFailure &&
- (!segmentsNeedingRefresh.isEmpty() ||
!dataSourcesNeedingRebuild.isEmpty()) &&
+ shouldRefresh() &&
(refreshImmediately || nextRefresh <
System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
@@ -334,6 +335,7 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
}
}
+
/**
* Lifecycle start method.
*/
@@ -361,6 +363,15 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
// noop
}
+ /**
+ * Refresh is executed only when there are segments or datasources needing
refresh.
+ */
+ @SuppressWarnings("GuardedBy")
+ protected boolean shouldRefresh()
+ {
+ return (!segmentsNeedingRefresh.isEmpty() ||
!dataSourcesNeedingRebuild.isEmpty());
+ }
+
public void awaitInitialization() throws InterruptedException
{
initialized.await();
@@ -373,6 +384,7 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
*
* @return schema information for the given datasource
*/
+ @Nullable
public T getDatasource(String name)
{
return tables.get(name);
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index dad0b78ea77..3a4f548b8ba 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -20,19 +20,27 @@
package org.apache.druid.segment.metadata;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus;
@@ -41,21 +49,30 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
+import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -71,17 +88,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
* <ul><li>Metadata query is executed only for those non-realtime segments for
which the schema is not cached.</li>
* <li>Datasources marked for refresh are then rebuilt.</li></ul>
* </li>
+ * <p>
+ * It is important to note that the datasource schema returned in {@link
#getDatasource} & {@link #getDataSourceInformationMap()}
+ * also includes columns from cold segments.
+ * Cold segments are processed in a separate thread and datasource schema from
cold segments is separately stored.
+ * </p>
*/
@ManageLifecycle
public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCache<DataSourceInformation>
{
private static final EmittingLogger log = new
EmittingLogger(CoordinatorSegmentMetadataCache.class);
+ private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
+ private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS =
TimeUnit.SECONDS.toMillis(50);
private final SegmentMetadataCacheConfig config;
private final ColumnTypeMergePolicy columnTypeMergePolicy;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
+ private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+ private volatile SegmentReplicationStatus segmentReplicationStatus = null;
+
+ // Datasource schema built from only cold segments.
+ private final ConcurrentHashMap<String, DataSourceInformation>
coldSchemaTable = new ConcurrentHashMap<>();
+
+ // Period for cold schema processing thread. This is a multiple of segment
polling period.
+ // Cold schema processing runs slower than the segment poll to save
processing cost of all segments.
+ // The downside is a delay in columns from cold segment reflecting in the
datasource schema.
+ private final long coldSchemaExecPeriodMillis;
+ private final ScheduledExecutorService coldSchemaExec;
private @Nullable Future<?> cacheExecFuture = null;
+ private @Nullable Future<?> coldSchemaExecFuture = null;
@Inject
public CoordinatorSegmentMetadataCache(
@@ -92,7 +128,9 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
InternalQueryConfig internalQueryConfig,
ServiceEmitter emitter,
SegmentSchemaCache segmentSchemaCache,
- SegmentSchemaBackFillQueue segmentSchemaBackfillQueue
+ SegmentSchemaBackFillQueue segmentSchemaBackfillQueue,
+ SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
+ Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier
)
{
super(queryLifecycleFactory, config, escalator, internalQueryConfig,
emitter);
@@ -100,6 +138,15 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
this.segmentSchemaCache = segmentSchemaCache;
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
+ this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
+ this.coldSchemaExecPeriodMillis =
+
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() *
COLD_SCHEMA_PERIOD_MULTIPLIER;
+ coldSchemaExec = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
+ .setDaemon(false)
+ .build()
+ );
initServerViewTimelineCallback(serverView);
}
@@ -168,11 +215,15 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
{
callbackExec.shutdownNow();
cacheExec.shutdownNow();
+ coldSchemaExec.shutdownNow();
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
+ if (coldSchemaExecFuture != null) {
+ coldSchemaExecFuture.cancel(true);
+ }
}
public void onLeaderStart()
@@ -181,6 +232,12 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
+ coldSchemaExecFuture = coldSchemaExec.schedule(
+ this::coldDatasourceSchemaExec,
+ coldSchemaExecPeriodMillis,
+ TimeUnit.MILLISECONDS
+ );
+
if (config.isAwaitInitializationOnStart()) {
awaitInitialization();
}
@@ -196,6 +253,9 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
+ if (coldSchemaExecFuture != null) {
+ coldSchemaExecFuture.cancel(true);
+ }
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
}
@@ -209,6 +269,11 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
segmentSchemaCache.awaitInitialization();
}
+ public void updateSegmentReplicationStatus(SegmentReplicationStatus
segmentReplicationStatus)
+ {
+ this.segmentReplicationStatus = segmentReplicationStatus;
+ }
+
@Override
protected void unmarkSegmentAsMutable(SegmentId segmentId)
{
@@ -336,6 +401,62 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
return availableSegmentMetadata;
}
+ @Override
+ public DataSourceInformation getDatasource(String name)
+ {
+ return getMergedDatasourceInformation(tables.get(name),
coldSchemaTable.get(name)).orElse(null);
+ }
+
+ @Override
+ public Map<String, DataSourceInformation> getDataSourceInformationMap()
+ {
+ Map<String, DataSourceInformation> hot = new HashMap<>(tables);
+ Map<String, DataSourceInformation> cold = new HashMap<>(coldSchemaTable);
+ Set<String> combinedDatasources = new HashSet<>(hot.keySet());
+ combinedDatasources.addAll(cold.keySet());
+ ImmutableMap.Builder<String, DataSourceInformation> combined =
ImmutableMap.builder();
+
+ for (String dataSource : combinedDatasources) {
+ getMergedDatasourceInformation(hot.get(dataSource), cold.get(dataSource))
+ .ifPresent(merged -> combined.put(
+ dataSource,
+ merged
+ ));
+ }
+
+ return combined.build();
+ }
+
+ private Optional<DataSourceInformation> getMergedDatasourceInformation(
+ final DataSourceInformation hot,
+ final DataSourceInformation cold
+ )
+ {
+ if (hot == null && cold == null) {
+ return Optional.empty();
+ } else if (hot != null && cold == null) {
+ return Optional.of(hot);
+ } else if (hot == null && cold != null) {
+ return Optional.of(cold);
+ } else {
+ final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+ List<RowSignature> signatures = new ArrayList<>();
+ // hot datasource schema takes precedence
+ signatures.add(hot.getRowSignature());
+ signatures.add(cold.getRowSignature());
+
+ for (RowSignature signature : signatures) {
+ mergeRowSignature(columnTypes, signature);
+ }
+
+ final RowSignature.Builder builder = RowSignature.builder();
+ columnTypes.forEach(builder::add);
+
+ return Optional.of(new DataSourceInformation(hot.getDataSource(),
builder.build()));
+ }
+ }
+
/**
* Executes SegmentMetadataQuery to fetch schema information for each
segment in the refresh list.
* The schema information for individual segments is combined to construct a
table schema, which is then cached.
@@ -382,6 +503,7 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
// Rebuild the datasources.
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature =
buildDataSourceRowSignature(dataSource);
+
if (rowSignature == null) {
log.info("RowSignature null for dataSource [%s], implying that it no
longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
@@ -419,6 +541,94 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
return cachedSegments;
}
+ @Nullable
+ private Integer getReplicationFactor(SegmentId segmentId)
+ {
+ if (segmentReplicationStatus == null) {
+ return null;
+ }
+ SegmentReplicaCount replicaCountsInCluster =
segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
+ return replicaCountsInCluster == null ? null :
replicaCountsInCluster.required();
+ }
+
+ @VisibleForTesting
+ protected void coldDatasourceSchemaExec()
+ {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ Set<String> dataSourceWithColdSegmentSet = new HashSet<>();
+
+ int datasources = 0;
+ int segments = 0;
+ int dataSourceWithColdSegments = 0;
+
+ Collection<ImmutableDruidDataSource> immutableDataSources =
+
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
+
+ for (ImmutableDruidDataSource dataSource : immutableDataSources) {
+ datasources++;
+ Collection<DataSegment> dataSegments = dataSource.getSegments();
+
+ final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+ for (DataSegment segment : dataSegments) {
+ Integer replicationFactor = getReplicationFactor(segment.getId());
+ if (replicationFactor != null && replicationFactor != 0) {
+ continue;
+ }
+ Optional<SchemaPayloadPlus> optionalSchema =
segmentSchemaCache.getSchemaForSegment(segment.getId());
+ if (optionalSchema.isPresent()) {
+ RowSignature rowSignature =
optionalSchema.get().getSchemaPayload().getRowSignature();
+ mergeRowSignature(columnTypes, rowSignature);
+ }
+ segments++;
+ }
+
+ if (columnTypes.isEmpty()) {
+ // this datasource doesn't have any cold segment
+ continue;
+ }
+
+ final RowSignature.Builder builder = RowSignature.builder();
+ columnTypes.forEach(builder::add);
+
+ RowSignature coldSignature = builder.build();
+
+ String dataSourceName = dataSource.getName();
+ dataSourceWithColdSegmentSet.add(dataSourceName);
+ dataSourceWithColdSegments++;
+
+ log.debug("[%s] signature from cold segments is [%s]", dataSourceName,
coldSignature);
+
+ coldSchemaTable.put(dataSourceName, new
DataSourceInformation(dataSourceName, coldSignature));
+ }
+
+ // remove any stale datasource from the map
+ coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);
+
+ String executionStatsLog = StringUtils.format(
+ "Cold schema processing took [%d] millis. "
+ + "Processed total [%d] datasources, [%d] segments. Found [%d]
datasources with cold segments.",
+ stopwatch.millisElapsed(), datasources, segments,
dataSourceWithColdSegments
+ );
+ if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
+ log.info(executionStatsLog);
+ } else {
+ log.debug(executionStatsLog);
+ }
+ }
+
+ private void mergeRowSignature(final Map<String, ColumnType> columnTypes,
final RowSignature signature)
+ {
+ for (String column : signature.getColumnNames()) {
+ final ColumnType columnType =
+ signature.getColumnType(column)
+ .orElseThrow(() -> new ISE("Encountered null type for
column [%s]", column));
+
+ columnTypes.compute(column, (c, existingType) ->
columnTypeMergePolicy.merge(existingType, columnType));
+ }
+ }
+
@VisibleForTesting
@Nullable
@Override
@@ -434,13 +644,7 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
Optional<SchemaPayloadPlus> optionalSchema =
segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature =
optionalSchema.get().getSchemaPayload().getRowSignature();
- for (String column : rowSignature.getColumnNames()) {
- final ColumnType columnType =
- rowSignature.getColumnType(column)
- .orElseThrow(() -> new ISE("Encountered null type
for column [%s]", column));
-
- columnTypes.compute(column, (c, existingType) ->
columnTypeMergePolicy.merge(existingType, columnType));
- }
+ mergeRowSignature(columnTypes, rowSignature);
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 36cfac8089c..9710bda79b4 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -816,6 +816,9 @@ public class DruidCoordinator
{
broadcastSegments = params.getBroadcastSegments();
segmentReplicationStatus = params.getSegmentReplicationStatus();
+ if (coordinatorSegmentMetadataCache != null) {
+
coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus);
+ }
// Collect stats for unavailable and under-replicated segments
final CoordinatorRunStats stats = params.getCoordinatorStats();
diff --git
a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
index 5aee343a851..58f5af58a3e 100644
---
a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
+++
b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
@@ -75,4 +75,10 @@ public class NoopCoordinatorClient implements
CoordinatorClient
// Ignore retryPolicy for the test client.
return this;
}
+
+ @Override
+ public ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments()
+ {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
index 81f65acf84a..4cc4ac38184 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
@@ -20,6 +20,8 @@
package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.client.BrokerServerView;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.TableDataSource;
@@ -61,16 +65,19 @@ import
org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
+import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -101,6 +108,8 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
private TestSegmentMetadataQueryWalker walker;
private SegmentSchemaCache segmentSchemaCache;
private SegmentSchemaBackFillQueue backFillQueue;
+ private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+ private Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier;
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
@Before
@@ -190,6 +199,12 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
}
);
+ sqlSegmentsMetadataManager =
Mockito.mock(SqlSegmentsMetadataManager.class);
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+ SegmentsMetadataManagerConfig metadataManagerConfig =
Mockito.mock(SegmentsMetadataManagerConfig.class);
+
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+ segmentsMetadataManagerConfigSupplier =
Suppliers.ofInstance(metadataManagerConfig);
+
inventoryView.init();
initLatch.await();
exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d");
@@ -227,7 +242,9 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -341,7 +358,9 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index e5b6db1d42d..ef1fb1e8edd 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -22,11 +22,14 @@ package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
@@ -37,6 +40,8 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
@@ -66,6 +71,8 @@ import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
+import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator;
@@ -74,18 +81,23 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
+import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
import org.skife.jdbi.v2.StatementContext;
import java.io.File;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -106,12 +118,19 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
private CoordinatorSegmentMetadataCache runningSchema;
private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
+ private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+ private Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier;
@Before
@Override
public void setUp() throws Exception
{
super.setUp();
+ sqlSegmentsMetadataManager =
Mockito.mock(SqlSegmentsMetadataManager.class);
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+ SegmentsMetadataManagerConfig metadataManagerConfig =
Mockito.mock(SegmentsMetadataManagerConfig.class);
+
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+ segmentsMetadataManagerConfigSupplier =
Suppliers.ofInstance(metadataManagerConfig);
}
@After
@@ -132,6 +151,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
public CoordinatorSegmentMetadataCache
buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws
InterruptedException
{
Preconditions.checkState(runningSchema == null);
+
runningSchema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@@ -140,7 +160,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -178,7 +200,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
public void testGetTableMapFoo() throws InterruptedException
{
CoordinatorSegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
- verifyFooDSSchema(schema);
+ verifyFooDSSchema(schema, 6);
}
@Test
@@ -312,7 +334,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -523,7 +547,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -558,6 +584,11 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(2);
+ SqlSegmentsMetadataManager sqlSegmentsMetadataManager =
Mockito.mock(SqlSegmentsMetadataManager.class);
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+ SegmentsMetadataManagerConfig metadataManagerConfig =
Mockito.mock(SegmentsMetadataManagerConfig.class);
+
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+ Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier =
Suppliers.ofInstance(metadataManagerConfig);
CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@@ -566,7 +597,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -605,6 +638,11 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
+ SqlSegmentsMetadataManager sqlSegmentsMetadataManager =
Mockito.mock(SqlSegmentsMetadataManager.class);
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+ SegmentsMetadataManagerConfig metadataManagerConfig =
Mockito.mock(SegmentsMetadataManagerConfig.class);
+
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+ Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier =
Suppliers.ofInstance(metadataManagerConfig);
CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@@ -613,7 +651,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -649,6 +689,11 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
+ SqlSegmentsMetadataManager sqlSegmentsMetadataManager =
Mockito.mock(SqlSegmentsMetadataManager.class);
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+ SegmentsMetadataManagerConfig metadataManagerConfig =
Mockito.mock(SegmentsMetadataManagerConfig.class);
+
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+ Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier =
Suppliers.ofInstance(metadataManagerConfig);
CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
@@ -657,7 +702,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -698,7 +745,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -756,7 +805,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -817,7 +868,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -852,7 +905,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -900,7 +955,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -972,7 +1029,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
internalQueryConfig,
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
);
Map<String, Object> queryContext = ImmutableMap.of(
@@ -1141,7 +1200,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
emitter,
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
)
{
@Override
@@ -1306,7 +1367,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
) {
@Override
void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas)
@@ -1385,7 +1448,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String>
dataSourcesToRebuild)
@@ -1565,7 +1630,9 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
- backFillQueue
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
) {
@Override
public Set<SegmentId> refreshSegmentsForDataSource(String dataSource,
Set<SegmentId> segments)
@@ -1594,7 +1661,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
Assert.assertEquals(0, refreshCount.get());
// verify that datasource schema is built
- verifyFooDSSchema(schema);
+ verifyFooDSSchema(schema, 6);
serverView.addSegment(segment3, ServerType.HISTORICAL);
@@ -1721,12 +1788,384 @@ public class CoordinatorSegmentMetadataCacheTest
extends CoordinatorSegmentMetad
Assert.assertEquals(existingMetadata.getNumReplicas(),
currentMetadata.getNumReplicas());
}
- private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema)
+ private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
+ {
+ // foo has both hot and cold segments
+ DataSegment coldSegment =
+ DataSegment.builder()
+ .dataSource(DATASOURCE1)
+ .interval(Intervals.of("1998/P2Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ // cold has only cold segments
+ DataSegment singleColdSegment =
+ DataSegment.builder()
+ .dataSource("cold")
+ .interval(Intervals.of("2000/P2Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new
ImmutableMap.Builder<>();
+ segmentStatsMap.put(coldSegment.getId(), new SegmentMetadata(20L,
"foo-fingerprint"));
+ segmentStatsMap.put(singleColdSegment.getId(), new SegmentMetadata(20L,
"cold-fingerprint"));
+ ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new
ImmutableMap.Builder<>();
+ schemaPayloadMap.put(
+ "foo-fingerprint",
+ new SchemaPayload(RowSignature.builder()
+ .add("dim1", ColumnType.STRING)
+ .add("c1", ColumnType.STRING)
+ .add("c2", ColumnType.LONG)
+ .build())
+ );
+ schemaPayloadMap.put(
+ "cold-fingerprint",
+ new SchemaPayload(
+ RowSignature.builder()
+ .add("f1", ColumnType.STRING)
+ .add("f2", ColumnType.DOUBLE)
+ .build()
+ )
+ );
+
+ segmentSchemaCache.updateFinalizedSegmentSchema(
+ new
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(),
schemaPayloadMap.build())
+ );
+
+ List<ImmutableDruidDataSource> druidDataSources = new ArrayList<>();
+ Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
+ segmentMap.put(coldSegment.getId(), coldSegment);
+ segmentMap.put(segment1.getId(), segment1);
+ segmentMap.put(segment2.getId(), segment2);
+ druidDataSources.add(new ImmutableDruidDataSource(
+ coldSegment.getDataSource(),
+ Collections.emptyMap(),
+ segmentMap
+ ));
+ druidDataSources.add(new ImmutableDruidDataSource(
+ singleColdSegment.getDataSource(),
+ Collections.emptyMap(),
+ Collections.singletonMap(singleColdSegment.getId(), singleColdSegment)
+ ));
+
+ Mockito.when(
+
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
+ .thenReturn(druidDataSources);
+
+ CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ segmentSchemaCache,
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
+ );
+
+ SegmentReplicaCount zeroSegmentReplicaCount =
Mockito.mock(SegmentReplicaCount.class);
+ SegmentReplicaCount nonZeroSegmentReplicaCount =
Mockito.mock(SegmentReplicaCount.class);
+ Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0);
+ Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1);
+ SegmentReplicationStatus segmentReplicationStatus =
Mockito.mock(SegmentReplicationStatus.class);
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegment.getId())))
+ .thenReturn(zeroSegmentReplicaCount);
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(singleColdSegment.getId())))
+ .thenReturn(zeroSegmentReplicaCount);
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment1.getId())))
+ .thenReturn(nonZeroSegmentReplicaCount);
+
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment2.getId())))
+ .thenReturn(nonZeroSegmentReplicaCount);
+
+ schema.updateSegmentReplicationStatus(segmentReplicationStatus);
+ schema.updateSegmentReplicationStatus(segmentReplicationStatus);
+
+ return schema;
+ }
+
+ @Test
+ public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws
IOException
+ {
+ CoordinatorSegmentMetadataCache schema =
setupForColdDatasourceSchemaTest();
+
+ schema.coldDatasourceSchemaExec();
+
+ Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")),
schema.getDataSourceInformationMap().keySet());
+
+ // verify that cold schema for both foo and cold is present
+ RowSignature fooSignature = schema.getDatasource("foo").getRowSignature();
+ List<String> columnNames = fooSignature.getColumnNames();
+
+ // verify that foo schema doesn't contain columns from hot segments
+ Assert.assertEquals(3, columnNames.size());
+
+ Assert.assertEquals("dim1", columnNames.get(0));
+ Assert.assertEquals(ColumnType.STRING,
fooSignature.getColumnType(columnNames.get(0)).get());
+
+ Assert.assertEquals("c1", columnNames.get(1));
+ Assert.assertEquals(ColumnType.STRING,
fooSignature.getColumnType(columnNames.get(1)).get());
+
+ Assert.assertEquals("c2", columnNames.get(2));
+ Assert.assertEquals(ColumnType.LONG,
fooSignature.getColumnType(columnNames.get(2)).get());
+
+ RowSignature coldSignature =
schema.getDatasource("cold").getRowSignature();
+ columnNames = coldSignature.getColumnNames();
+ Assert.assertEquals("f1", columnNames.get(0));
+ Assert.assertEquals(ColumnType.STRING,
coldSignature.getColumnType(columnNames.get(0)).get());
+
+ Assert.assertEquals("f2", columnNames.get(1));
+ Assert.assertEquals(ColumnType.DOUBLE,
coldSignature.getColumnType(columnNames.get(1)).get());
+
+ Set<SegmentId> segmentIds = new HashSet<>();
+ segmentIds.add(segment1.getId());
+ segmentIds.add(segment2.getId());
+
+ schema.refresh(segmentIds, new HashSet<>());
+
+ Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")),
schema.getDataSourceInformationMap().keySet());
+
+ coldSignature = schema.getDatasource("cold").getRowSignature();
+ columnNames = coldSignature.getColumnNames();
+ Assert.assertEquals("f1", columnNames.get(0));
+ Assert.assertEquals(ColumnType.STRING,
coldSignature.getColumnType(columnNames.get(0)).get());
+
+ Assert.assertEquals("f2", columnNames.get(1));
+ Assert.assertEquals(ColumnType.DOUBLE,
coldSignature.getColumnType(columnNames.get(1)).get());
+
+ // foo now contains schema from both hot and cold segments
+ verifyFooDSSchema(schema, 8);
+ RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();
+
+ // cold columns should be present at the end
+ columnNames = rowSignature.getColumnNames();
+ Assert.assertEquals("c1", columnNames.get(6));
+ Assert.assertEquals(ColumnType.STRING,
rowSignature.getColumnType(columnNames.get(6)).get());
+
+ Assert.assertEquals("c2", columnNames.get(7));
+ Assert.assertEquals(ColumnType.LONG,
rowSignature.getColumnType(columnNames.get(7)).get());
+ }
+
+ @Test
+ public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws
IOException
+ {
+ CoordinatorSegmentMetadataCache schema =
setupForColdDatasourceSchemaTest();
+
+ Set<SegmentId> segmentIds = new HashSet<>();
+ segmentIds.add(segment1.getId());
+ segmentIds.add(segment2.getId());
+
+ schema.refresh(segmentIds, new HashSet<>());
+ // cold datasource shouldn't be present
+ Assert.assertEquals(Collections.singleton("foo"),
schema.getDataSourceInformationMap().keySet());
+
+ // cold columns shouldn't be present
+ verifyFooDSSchema(schema, 6);
+ Assert.assertNull(schema.getDatasource("cold"));
+
+ schema.coldDatasourceSchemaExec();
+
+ // could datasource should be present now
+ Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")),
schema.getDataSourceInformationMap().keySet());
+
+ RowSignature coldSignature =
schema.getDatasource("cold").getRowSignature();
+ List<String> columnNames = coldSignature.getColumnNames();
+ Assert.assertEquals("f1", columnNames.get(0));
+ Assert.assertEquals(ColumnType.STRING,
coldSignature.getColumnType(columnNames.get(0)).get());
+
+ Assert.assertEquals("f2", columnNames.get(1));
+ Assert.assertEquals(ColumnType.DOUBLE,
coldSignature.getColumnType(columnNames.get(1)).get());
+
+ // columns from cold datasource should be present
+ verifyFooDSSchema(schema, 8);
+ RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();
+
+ columnNames = rowSignature.getColumnNames();
+ Assert.assertEquals("c1", columnNames.get(6));
+ Assert.assertEquals(ColumnType.STRING,
rowSignature.getColumnType(columnNames.get(6)).get());
+
+ Assert.assertEquals("c2", columnNames.get(7));
+ Assert.assertEquals(ColumnType.LONG,
rowSignature.getColumnType(columnNames.get(7)).get());
+ }
+
+ @Test
+ public void testColdDatasourceSchema_verifyStaleDatasourceRemoved()
+ {
+ DataSegment coldSegmentAlpha =
+ DataSegment.builder()
+ .dataSource("alpha")
+ .interval(Intervals.of("2000/P2Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ DataSegment coldSegmentBeta =
+ DataSegment.builder()
+ .dataSource("beta")
+ .interval(Intervals.of("2000/P2Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ DataSegment coldSegmentGamma =
+ DataSegment.builder()
+ .dataSource("gamma")
+ .interval(Intervals.of("2000/P2Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ DataSegment hotSegmentGamma =
+ DataSegment.builder()
+ .dataSource("gamma")
+ .interval(Intervals.of("2001/P2Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new
ImmutableMap.Builder<>();
+ segmentStatsMap.put(coldSegmentAlpha.getId(), new SegmentMetadata(20L,
"cold"));
+ segmentStatsMap.put(coldSegmentBeta.getId(), new SegmentMetadata(20L,
"cold"));
+ segmentStatsMap.put(hotSegmentGamma.getId(), new SegmentMetadata(20L,
"hot"));
+ segmentStatsMap.put(coldSegmentGamma.getId(), new SegmentMetadata(20L,
"cold"));
+
+ ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new
ImmutableMap.Builder<>();
+ schemaPayloadMap.put(
+ "cold",
+ new SchemaPayload(RowSignature.builder()
+ .add("dim1", ColumnType.STRING)
+ .add("c1", ColumnType.STRING)
+ .add("c2", ColumnType.LONG)
+ .build())
+ );
+ schemaPayloadMap.put(
+ "hot",
+ new SchemaPayload(RowSignature.builder()
+ .add("c3", ColumnType.STRING)
+ .add("c4", ColumnType.STRING)
+ .build())
+ );
+ segmentSchemaCache.updateFinalizedSegmentSchema(
+ new
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(),
schemaPayloadMap.build())
+ );
+
+ List<ImmutableDruidDataSource> druidDataSources = new ArrayList<>();
+ druidDataSources.add(
+ new ImmutableDruidDataSource(
+ "alpha",
+ Collections.emptyMap(),
+ Collections.singletonMap(coldSegmentAlpha.getId(),
coldSegmentAlpha)
+ )
+ );
+
+ Map<SegmentId, DataSegment> gammaSegments = new HashMap<>();
+ gammaSegments.put(hotSegmentGamma.getId(), hotSegmentGamma);
+ gammaSegments.put(coldSegmentGamma.getId(), coldSegmentGamma);
+
+ druidDataSources.add(
+ new ImmutableDruidDataSource(
+ "gamma",
+ Collections.emptyMap(),
+ gammaSegments
+ )
+ );
+
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
+ .thenReturn(druidDataSources);
+
+ CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ segmentSchemaCache,
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
+ );
+
+ SegmentReplicaCount zeroSegmentReplicaCount =
Mockito.mock(SegmentReplicaCount.class);
+ SegmentReplicaCount nonZeroSegmentReplicaCount =
Mockito.mock(SegmentReplicaCount.class);
+ Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0);
+ Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1);
+ SegmentReplicationStatus segmentReplicationStatus =
Mockito.mock(SegmentReplicationStatus.class);
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentAlpha.getId())))
+ .thenReturn(zeroSegmentReplicaCount);
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentBeta.getId())))
+ .thenReturn(zeroSegmentReplicaCount);
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentGamma.getId())))
+ .thenReturn(zeroSegmentReplicaCount);
+
+
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(hotSegmentGamma.getId())))
+ .thenReturn(nonZeroSegmentReplicaCount);
+
+ schema.updateSegmentReplicationStatus(segmentReplicationStatus);
+
+ schema.coldDatasourceSchemaExec();
+ // alpha has only 1 cold segment
+ Assert.assertNotNull(schema.getDatasource("alpha"));
+ // gamma has both hot and cold segment
+ Assert.assertNotNull(schema.getDatasource("gamma"));
+ // assert that cold schema for gamma doesn't contain any columns from hot
segment
+ RowSignature rowSignature =
schema.getDatasource("gamma").getRowSignature();
+ Assert.assertTrue(rowSignature.contains("dim1"));
+ Assert.assertTrue(rowSignature.contains("c1"));
+ Assert.assertTrue(rowSignature.contains("c2"));
+ Assert.assertFalse(rowSignature.contains("c3"));
+ Assert.assertFalse(rowSignature.contains("c4"));
+
+ Assert.assertEquals(new HashSet<>(Arrays.asList("alpha", "gamma")),
schema.getDataSourceInformationMap().keySet());
+
+ druidDataSources.clear();
+ druidDataSources.add(
+ new ImmutableDruidDataSource(
+ "beta",
+ Collections.emptyMap(),
+ Collections.singletonMap(coldSegmentBeta.getId(), coldSegmentBeta)
+ )
+ );
+
+ druidDataSources.add(
+ new ImmutableDruidDataSource(
+ "gamma",
+ Collections.emptyMap(),
+ Collections.singletonMap(hotSegmentGamma.getId(), hotSegmentGamma)
+ )
+ );
+
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
+ .thenReturn(druidDataSources);
+
+ schema.coldDatasourceSchemaExec();
+ Assert.assertNotNull(schema.getDatasource("beta"));
+ // alpha doesn't have any segments
+ Assert.assertNull(schema.getDatasource("alpha"));
+ // gamma just has 1 hot segment
+ Assert.assertNull(schema.getDatasource("gamma"));
+
+ Assert.assertNull(schema.getDatasource("doesnotexist"));
+
+ Assert.assertEquals(Collections.singleton("beta"),
schema.getDataSourceInformationMap().keySet());
+ }
+
+ private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int
columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
final RowSignature fooRowSignature = fooDs.getRowSignature();
List<String> columnNames = fooRowSignature.getColumnNames();
- Assert.assertEquals(6, columnNames.size());
+ Assert.assertEquals(columns, columnNames.size());
Assert.assertEquals("__time", columnNames.get(0));
Assert.assertEquals(ColumnType.LONG,
fooRowSignature.getColumnType(columnNames.get(0)).get());
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
index 7974ed460eb..628b6ea3978 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
@@ -173,6 +173,16 @@ public class BrokerSegmentMetadataCache extends
AbstractSegmentMetadataCache<Phy
callbackExec.shutdownNow();
}
+ /**
+ * Execute refresh on the broker in each cycle if
CentralizedDatasourceSchema is enabled
+ * else if there are segments or datasources to be refreshed.
+ */
+ @Override
+ protected boolean shouldRefresh()
+ {
+ return centralizedDatasourceSchemaConfig.isEnabled() ||
super.shouldRefresh();
+ }
+
/**
* Refreshes the set of segments in two steps:
* <ul>
@@ -196,6 +206,11 @@ public class BrokerSegmentMetadataCache extends
AbstractSegmentMetadataCache<Phy
// segmentMetadataInfo keys should be a superset of all other sets
including datasources to refresh
final Set<String> dataSourcesToQuery = new
HashSet<>(segmentMetadataInfo.keySet());
+ // this is the complete set of datasources polled from the Coordinator
+ final Set<String> polledDatasources = queryDataSources();
+
+ dataSourcesToQuery.addAll(polledDatasources);
+
log.debug("Querying schema for [%s] datasources from Coordinator.",
dataSourcesToQuery);
// Fetch datasource information from the Coordinator
@@ -227,14 +242,7 @@ public class BrokerSegmentMetadataCache extends
AbstractSegmentMetadataCache<Phy
// Remove those datasource for which we received schema from the
Coordinator.
dataSourcesToRebuild.removeAll(polledDataSourceMetadata.keySet());
- if (centralizedDatasourceSchemaConfig.isEnabled()) {
- // this is a hacky way to ensure refresh is executed even if there are
no new segments to refresh
- // once, CentralizedDatasourceSchema feature is GA, brokers should
simply poll schema for all datasources
- dataSourcesNeedingRebuild.addAll(segmentMetadataInfo.keySet());
- } else {
- dataSourcesNeedingRebuild.clear();
- }
- log.debug("DatasourcesNeedingRebuild are [%s]",
dataSourcesNeedingRebuild);
+ dataSourcesNeedingRebuild.clear();
}
// Rebuild the datasources.
@@ -267,6 +275,23 @@ public class BrokerSegmentMetadataCache extends
AbstractSegmentMetadataCache<Phy
// noop, no additional action needed when segment is removed.
}
+ private Set<String> queryDataSources()
+ {
+ Set<String> dataSources = new HashSet<>();
+
+ try {
+ Set<String> polled =
FutureUtils.getUnchecked(coordinatorClient.fetchDataSourcesWithUsedSegments(),
true);
+ if (polled != null) {
+ dataSources.addAll(polled);
+ }
+ }
+ catch (Exception e) {
+ log.debug(e, "Failed to query datasources from the Coordinator.");
+ }
+
+ return dataSources;
+ }
+
private Map<String, PhysicalDatasourceMetadata>
queryDataSourceInformation(Set<String> dataSourcesToQuery)
{
Stopwatch stopwatch = Stopwatch.createStarted();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index 23b2759286c..65610ce99f2 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -74,11 +74,13 @@ import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.TestTimelineServerView;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
+import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -332,6 +334,9 @@ public class BrokerSegmentMetadataCacheTest extends
BrokerSegmentMetadataCacheTe
ArgumentCaptor<Set<String>> argumentCaptor =
ArgumentCaptor.forClass(Set.class);
CoordinatorClient coordinatorClient =
Mockito.mock(CoordinatorClient.class);
Mockito.when(coordinatorClient.fetchDataSourceInformation(argumentCaptor.capture())).thenReturn(Futures.immediateFuture(null));
+
+ Set<String> datsources = Sets.newHashSet(DATASOURCE1, DATASOURCE2,
DATASOURCE3, SOME_DATASOURCE, "xyz", "coldDS");
+
Mockito.when(coordinatorClient.fetchDataSourcesWithUsedSegments()).thenReturn(Futures.immediateFuture(datsources));
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
@@ -347,7 +352,7 @@ public class BrokerSegmentMetadataCacheTest extends
BrokerSegmentMetadataCacheTe
schema.start();
schema.awaitInitialization();
- Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3,
SOME_DATASOURCE), argumentCaptor.getValue());
+ Assert.assertEquals(datsources, argumentCaptor.getValue());
refreshLatch = new CountDownLatch(1);
serverView.addSegment(newSegment("xyz", 0), ServerType.HISTORICAL);
@@ -355,7 +360,87 @@ public class BrokerSegmentMetadataCacheTest extends
BrokerSegmentMetadataCacheTe
refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
// verify that previously refreshed are included in the last coordinator
poll
- Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3,
SOME_DATASOURCE, "xyz"), argumentCaptor.getValue());
+ Assert.assertEquals(datsources, argumentCaptor.getValue());
+ }
+
+ @Test
+ public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled()
throws InterruptedException
+ {
+ CentralizedDatasourceSchemaConfig config =
CentralizedDatasourceSchemaConfig.create();
+ config.setEnabled(true);
+
+ serverView = new TestTimelineServerView(walker.getSegments(),
Collections.emptyList());
+ druidServers = serverView.getDruidServers();
+
+ BrokerSegmentMetadataCacheConfig metadataCacheConfig =
BrokerSegmentMetadataCacheConfig.create("PT1S");
+ metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
+ BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ new PhysicalDatasourceMetadataFactory(globalTableJoinable,
segmentManager),
+ new NoopCoordinatorClient(),
+ config
+ ) {
+ @Override
+ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String>
dataSourcesToRebuild)
+ throws IOException
+ {
+ super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+ refreshLatch.countDown();
+ }
+ };
+
+ // refresh should be executed more than once, with the feature disabled
refresh should be executed only once
+ refreshLatch = new CountDownLatch(3);
+ schema.start();
+ schema.awaitInitialization();
+
+ refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+ Assert.assertEquals(0, refreshLatch.getCount());
+ }
+
+ @Test
+ public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled()
throws InterruptedException
+ {
+ BrokerSegmentMetadataCacheConfig metadataCacheConfig =
BrokerSegmentMetadataCacheConfig.create("PT1S");
+ metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
+
+ serverView = new TestTimelineServerView(walker.getSegments(),
Collections.emptyList());
+ druidServers = serverView.getDruidServers();
+
+ BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ new PhysicalDatasourceMetadataFactory(globalTableJoinable,
segmentManager),
+ new NoopCoordinatorClient(),
+ CentralizedDatasourceSchemaConfig.create()
+ ) {
+ @Override
+ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String>
dataSourcesToRebuild)
+ throws IOException
+ {
+ super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+ refreshLatch.countDown();
+ }
+ };
+
+ // refresh should be executed only once
+ refreshLatch = new CountDownLatch(3);
+ schema.start();
+ schema.awaitInitialization();
+
+ refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+ Assert.assertEquals(2, refreshLatch.getCount());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]