This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f67ff92d07d [bugfix] Run cold schema refresh thread periodically
(#16873)
f67ff92d07d is described below
commit f67ff92d07d4838e6acd5fb4761baf3c9de6c3e2
Author: Rishabh Singh <[email protected]>
AuthorDate: Tue Aug 13 11:44:01 2024 +0530
[bugfix] Run cold schema refresh thread periodically (#16873)
* Fix build
* Run coldSchemaExec thread periodically
* Bugfix: Run cold schema refresh periodically
* Rename metrics for deep storage only segment schema process
---
docs/operations/metrics.md | 3 +
.../metadata/CoordinatorSegmentMetadataCache.java | 82 ++++++++++++++++------
.../CoordinatorSegmentMetadataCacheTest.java | 64 +++++++++++++++--
3 files changed, 121 insertions(+), 28 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ec97f44fe39..83cc9550690 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -382,6 +382,9 @@ These metrics are emitted by the Druid Coordinator in every
run of the correspon
|`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment
schema cached.||Depends on the number of distinct schema in the cluster.|
|`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for
which schema was fetched by executing segment metadata query.||Eventually it
should be 0.|
|`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of
segments for which schema is cached after back filling in the database.||This
value gets reset after each database poll. Eventually it should be 0.|
+|`metadatacache/deepStorageOnly/segment/count`|Number of available segments
present only in deep storage.|`dataSource`||
+|`metadatacache/deepStorageOnly/refresh/count`|Number of deep storage only
segments with cached schema.|`dataSource`||
+|`metadatacache/deepStorageOnly/process/time`|Time taken in milliseconds to
process deep storage only segment schema.||Under a minute|
## General Health
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 3a4f548b8ba..0c03f7af73c 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -24,7 +24,6 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.ImmutableDruidDataSource;
@@ -35,12 +34,15 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus;
@@ -69,7 +71,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -100,12 +101,15 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
private static final EmittingLogger log = new
EmittingLogger(CoordinatorSegmentMetadataCache.class);
private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS =
TimeUnit.SECONDS.toMillis(50);
+ private static final String DEEP_STORAGE_ONLY_METRIC_PREFIX =
"metadatacache/deepStorageOnly/";
private final SegmentMetadataCacheConfig config;
private final ColumnTypeMergePolicy columnTypeMergePolicy;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+ private final Supplier<SegmentsMetadataManagerConfig>
segmentsMetadataManagerConfigSupplier;
+ private final ServiceEmitter emitter;
private volatile SegmentReplicationStatus segmentReplicationStatus = null;
// Datasource schema built from only cold segments.
@@ -114,7 +118,6 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
// Period for cold schema processing thread. This is a multiple of segment
polling period.
// Cold schema processing runs slower than the segment poll to save
processing cost of all segments.
// The downside is a delay in columns from cold segment reflecting in the
datasource schema.
- private final long coldSchemaExecPeriodMillis;
private final ScheduledExecutorService coldSchemaExec;
private @Nullable Future<?> cacheExecFuture = null;
private @Nullable Future<?> coldSchemaExecFuture = null;
@@ -139,18 +142,19 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
this.segmentSchemaCache = segmentSchemaCache;
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
- this.coldSchemaExecPeriodMillis =
-
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() *
COLD_SCHEMA_PERIOD_MULTIPLIER;
- coldSchemaExec = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
- .setDaemon(false)
- .build()
- );
+ this.segmentsMetadataManagerConfigSupplier =
segmentsMetadataManagerConfigSupplier;
+ this.emitter = emitter;
+ this.coldSchemaExec =
Execs.scheduledSingleThreaded("DruidColdSchema-ScheduledExecutor-%d");
initServerViewTimelineCallback(serverView);
}
+ long getColdSchemaExecPeriodMillis()
+ {
+ return
(segmentsMetadataManagerConfigSupplier.get().getPollDuration().toStandardDuration().getMillis())
+ * COLD_SCHEMA_PERIOD_MULTIPLIER;
+ }
+
private void initServerViewTimelineCallback(final CoordinatorServerView
serverView)
{
serverView.registerTimelineCallback(
@@ -232,9 +236,10 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
- coldSchemaExecFuture = coldSchemaExec.schedule(
+ coldSchemaExecFuture = coldSchemaExec.scheduleWithFixedDelay(
this::coldDatasourceSchemaExec,
- coldSchemaExecPeriodMillis,
+ getColdSchemaExecPeriodMillis(),
+ getColdSchemaExecPeriodMillis(),
TimeUnit.MILLISECONDS
);
@@ -558,9 +563,7 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
Set<String> dataSourceWithColdSegmentSet = new HashSet<>();
- int datasources = 0;
- int segments = 0;
- int dataSourceWithColdSegments = 0;
+ int datasources = 0, dataSourceWithColdSegments = 0, totalColdSegments = 0;
Collection<ImmutableDruidDataSource> immutableDataSources =
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
@@ -571,6 +574,9 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+ int coldSegments = 0;
+ int coldSegmentWithSchema = 0;
+
for (DataSegment segment : dataSegments) {
Integer replicationFactor = getReplicationFactor(segment.getId());
if (replicationFactor != null && replicationFactor != 0) {
@@ -580,36 +586,66 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
if (optionalSchema.isPresent()) {
RowSignature rowSignature =
optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
+ coldSegmentWithSchema++;
}
- segments++;
+ coldSegments++;
}
- if (columnTypes.isEmpty()) {
+ if (coldSegments == 0) {
// this datasource doesn't have any cold segment
continue;
}
+ totalColdSegments += coldSegments;
+
+ String dataSourceName = dataSource.getName();
+
+ ServiceMetricEvent.Builder metricBuilder =
+ new
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE,
dataSourceName);
+
+ emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX +
"segment/count", coldSegments));
+
+ if (columnTypes.isEmpty()) {
+ // this datasource doesn't have schema for cold segments
+ continue;
+ }
+
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
RowSignature coldSignature = builder.build();
- String dataSourceName = dataSource.getName();
dataSourceWithColdSegmentSet.add(dataSourceName);
dataSourceWithColdSegments++;
- log.debug("[%s] signature from cold segments is [%s]", dataSourceName,
coldSignature);
+ DataSourceInformation druidTable = new
DataSourceInformation(dataSourceName, coldSignature);
+ DataSourceInformation oldTable = coldSchemaTable.put(dataSourceName,
druidTable);
- coldSchemaTable.put(dataSourceName, new
DataSourceInformation(dataSourceName, coldSignature));
+ if (oldTable == null ||
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+ log.info("[%s] has new cold signature: %s.", dataSource,
druidTable.getRowSignature());
+ } else {
+ log.debug("[%s] signature is unchanged.", dataSource);
+ }
+
+ emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX +
"refresh/count", coldSegmentWithSchema));
+
+ log.debug("[%s] signature from cold segments is [%s]", dataSourceName,
coldSignature);
}
// remove any stale datasource from the map
coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);
+ emitter.emit(
+ new ServiceMetricEvent.Builder().setMetric(
+ DEEP_STORAGE_ONLY_METRIC_PREFIX + "process/time",
+ stopwatch.millisElapsed()
+ )
+ );
+
String executionStatsLog = StringUtils.format(
"Cold schema processing took [%d] millis. "
- + "Processed total [%d] datasources, [%d] segments. Found [%d]
datasources with cold segments.",
- stopwatch.millisElapsed(), datasources, segments,
dataSourceWithColdSegments
+ + "Processed total [%d] datasources, [%d] segments. Found [%d]
datasources with cold segment schema.",
+ stopwatch.millisElapsed(), datasources, totalColdSegments,
dataSourceWithColdSegments
);
if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
log.info(executionStatsLog);
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index ef1fb1e8edd..8fbc78a7412 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
@@ -1788,7 +1789,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
Assert.assertEquals(existingMetadata.getNumReplicas(),
currentMetadata.getNumReplicas());
}
- private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
+ private CoordinatorSegmentMetadataCache
setupForColdDatasourceSchemaTest(ServiceEmitter emitter)
{
// foo has both hot and cold segments
DataSegment coldSegment =
@@ -1862,7 +1863,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
- new NoopServiceEmitter(),
+ emitter,
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
@@ -1893,10 +1894,17 @@ public class CoordinatorSegmentMetadataCacheTest
extends CoordinatorSegmentMetad
@Test
public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws
IOException
{
- CoordinatorSegmentMetadataCache schema =
setupForColdDatasourceSchemaTest();
+ StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
+ CoordinatorSegmentMetadataCache schema =
setupForColdDatasourceSchemaTest(emitter);
schema.coldDatasourceSchemaExec();
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1);
+
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")),
schema.getDataSourceInformationMap().keySet());
// verify that cold schema for both foo and cold is present
@@ -1955,7 +1963,8 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
@Test
public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws
IOException
{
- CoordinatorSegmentMetadataCache schema =
setupForColdDatasourceSchemaTest();
+ StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
+ CoordinatorSegmentMetadataCache schema =
setupForColdDatasourceSchemaTest(emitter);
Set<SegmentId> segmentIds = new HashSet<>();
segmentIds.add(segment1.getId());
@@ -1971,7 +1980,13 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
schema.coldDatasourceSchemaExec();
- // could datasource should be present now
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+ emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1);
+
+ // cold datasource should be present now
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")),
schema.getDataSourceInformationMap().keySet());
RowSignature coldSignature =
schema.getDatasource("cold").getRowSignature();
@@ -2160,6 +2175,45 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
Assert.assertEquals(Collections.singleton("beta"),
schema.getDataSourceInformationMap().keySet());
}
+ @Test
+ public void testColdDatasourceSchemaExecRunsPeriodically() throws
InterruptedException
+ {
+ // Make sure the thread runs more than once
+ CountDownLatch latch = new CountDownLatch(2);
+
+ CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ segmentSchemaCache,
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
+ ) {
+ @Override
+ long getColdSchemaExecPeriodMillis()
+ {
+ return 10;
+ }
+
+ @Override
+ protected void coldDatasourceSchemaExec()
+ {
+ latch.countDown();
+ super.coldDatasourceSchemaExec();
+ }
+ };
+
+ schema.onLeaderStart();
+ schema.awaitInitialization();
+
+ latch.await(1, TimeUnit.SECONDS);
+ Assert.assertEquals(0, latch.getCount());
+ }
+
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int
columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]