This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f67ff92d07d [bugfix] Run cold schema refresh thread periodically  
(#16873)
f67ff92d07d is described below

commit f67ff92d07d4838e6acd5fb4761baf3c9de6c3e2
Author: Rishabh Singh <[email protected]>
AuthorDate: Tue Aug 13 11:44:01 2024 +0530

    [bugfix] Run cold schema refresh thread periodically  (#16873)
    
    * Fix build
    
    * Run coldSchemaExec thread periodically
    
    * Bugfix: Run cold schema refresh periodically
    
    * Rename metrics for deep storage only segment schema process
---
 docs/operations/metrics.md                         |  3 +
 .../metadata/CoordinatorSegmentMetadataCache.java  | 82 ++++++++++++++++------
 .../CoordinatorSegmentMetadataCacheTest.java       | 64 +++++++++++++++--
 3 files changed, 121 insertions(+), 28 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ec97f44fe39..83cc9550690 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -382,6 +382,9 @@ These metrics are emitted by the Druid Coordinator in every 
run of the correspon
 |`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment 
schema cached.||Depends on the number of distinct schema in the cluster.|
 |`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for 
which schema was fetched by executing segment metadata query.||Eventually it 
should be 0.|
 |`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of 
segments for which schema is cached after back filling in the database.||This 
value gets reset after each database poll. Eventually it should be 0.|
+|`metadatacache/deepStorageOnly/segment/count`|Number of available segments 
present only in deep storage.|`dataSource`||
+|`metadatacache/deepStorageOnly/refresh/count`|Number of deep storage only 
segments with cached schema.|`dataSource`||
+|`metadatacache/deepStorageOnly/process/time`|Time taken in milliseconds to 
process deep storage only segment schema.||Under a minute|
 
 ## General Health
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 3a4f548b8ba..0c03f7af73c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -24,7 +24,6 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.ImmutableDruidDataSource;
@@ -35,12 +34,15 @@ import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.metadata.SqlSegmentsMetadataManager;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
 import org.apache.druid.segment.SchemaPayloadPlus;
@@ -69,7 +71,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -100,12 +101,15 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
   private static final EmittingLogger log = new 
EmittingLogger(CoordinatorSegmentMetadataCache.class);
   private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
   private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = 
TimeUnit.SECONDS.toMillis(50);
+  private static final String DEEP_STORAGE_ONLY_METRIC_PREFIX = 
"metadatacache/deepStorageOnly/";
 
   private final SegmentMetadataCacheConfig config;
   private final ColumnTypeMergePolicy columnTypeMergePolicy;
   private final SegmentSchemaCache segmentSchemaCache;
   private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
   private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+  private final Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier;
+  private final ServiceEmitter emitter;
   private volatile SegmentReplicationStatus segmentReplicationStatus = null;
 
   // Datasource schema built from only cold segments.
@@ -114,7 +118,6 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
   // Period for cold schema processing thread. This is a multiple of segment 
polling period.
   // Cold schema processing runs slower than the segment poll to save 
processing cost of all segments.
   // The downside is a delay in columns from cold segment reflecting in the 
datasource schema.
-  private final long coldSchemaExecPeriodMillis;
   private final ScheduledExecutorService coldSchemaExec;
   private @Nullable Future<?> cacheExecFuture = null;
   private @Nullable Future<?> coldSchemaExecFuture = null;
@@ -139,18 +142,19 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     this.segmentSchemaCache = segmentSchemaCache;
     this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
     this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
-    this.coldSchemaExecPeriodMillis =
-        
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() * 
COLD_SCHEMA_PERIOD_MULTIPLIER;
-    coldSchemaExec = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder()
-            .setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
-            .setDaemon(false)
-            .build()
-    );
+    this.segmentsMetadataManagerConfigSupplier = 
segmentsMetadataManagerConfigSupplier;
+    this.emitter = emitter;
+    this.coldSchemaExec = 
Execs.scheduledSingleThreaded("DruidColdSchema-ScheduledExecutor-%d");
 
     initServerViewTimelineCallback(serverView);
   }
 
+  long getColdSchemaExecPeriodMillis()
+  {
+    return 
(segmentsMetadataManagerConfigSupplier.get().getPollDuration().toStandardDuration().getMillis())
+           * COLD_SCHEMA_PERIOD_MULTIPLIER;
+  }
+
   private void initServerViewTimelineCallback(final CoordinatorServerView 
serverView)
   {
     serverView.registerTimelineCallback(
@@ -232,9 +236,10 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     try {
       segmentSchemaBackfillQueue.onLeaderStart();
       cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
-      coldSchemaExecFuture = coldSchemaExec.schedule(
+      coldSchemaExecFuture = coldSchemaExec.scheduleWithFixedDelay(
           this::coldDatasourceSchemaExec,
-          coldSchemaExecPeriodMillis,
+          getColdSchemaExecPeriodMillis(),
+          getColdSchemaExecPeriodMillis(),
           TimeUnit.MILLISECONDS
       );
 
@@ -558,9 +563,7 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
 
     Set<String> dataSourceWithColdSegmentSet = new HashSet<>();
 
-    int datasources = 0;
-    int segments = 0;
-    int dataSourceWithColdSegments = 0;
+    int datasources = 0, dataSourceWithColdSegments = 0, totalColdSegments = 0;
 
     Collection<ImmutableDruidDataSource> immutableDataSources =
         
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
@@ -571,6 +574,9 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
 
       final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
 
+      int coldSegments = 0;
+      int coldSegmentWithSchema = 0;
+
       for (DataSegment segment : dataSegments) {
         Integer replicationFactor = getReplicationFactor(segment.getId());
         if (replicationFactor != null && replicationFactor != 0) {
@@ -580,36 +586,66 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
         if (optionalSchema.isPresent()) {
           RowSignature rowSignature = 
optionalSchema.get().getSchemaPayload().getRowSignature();
           mergeRowSignature(columnTypes, rowSignature);
+          coldSegmentWithSchema++;
         }
-        segments++;
+        coldSegments++;
       }
 
-      if (columnTypes.isEmpty()) {
+      if (coldSegments == 0) {
         // this datasource doesn't have any cold segment
         continue;
       }
 
+      totalColdSegments += coldSegments;
+
+      String dataSourceName = dataSource.getName();
+
+      ServiceMetricEvent.Builder metricBuilder =
+          new 
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, 
dataSourceName);
+
+      emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX + 
"segment/count", coldSegments));
+
+      if (columnTypes.isEmpty()) {
+        // this datasource doesn't have schema for cold segments
+        continue;
+      }
+
       final RowSignature.Builder builder = RowSignature.builder();
       columnTypes.forEach(builder::add);
 
       RowSignature coldSignature = builder.build();
 
-      String dataSourceName = dataSource.getName();
       dataSourceWithColdSegmentSet.add(dataSourceName);
       dataSourceWithColdSegments++;
 
-      log.debug("[%s] signature from cold segments is [%s]", dataSourceName, 
coldSignature);
+      DataSourceInformation druidTable = new 
DataSourceInformation(dataSourceName, coldSignature);
+      DataSourceInformation oldTable = coldSchemaTable.put(dataSourceName, 
druidTable);
 
-      coldSchemaTable.put(dataSourceName, new 
DataSourceInformation(dataSourceName, coldSignature));
+      if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+        log.info("[%s] has new cold signature: %s.", dataSource, 
druidTable.getRowSignature());
+      } else {
+        log.debug("[%s] signature is unchanged.", dataSource);
+      }
+
+      emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX + 
"refresh/count", coldSegmentWithSchema));
+
+      log.debug("[%s] signature from cold segments is [%s]", dataSourceName, 
coldSignature);
     }
 
     // remove any stale datasource from the map
     coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().setMetric(
+            DEEP_STORAGE_ONLY_METRIC_PREFIX + "process/time",
+            stopwatch.millisElapsed()
+        )
+    );
+
     String executionStatsLog = StringUtils.format(
         "Cold schema processing took [%d] millis. "
-        + "Processed total [%d] datasources, [%d] segments. Found [%d] 
datasources with cold segments.",
-        stopwatch.millisElapsed(), datasources, segments, 
dataSourceWithColdSegments
+        + "Processed total [%d] datasources, [%d] segments. Found [%d] 
datasources with cold segment schema.",
+        stopwatch.millisElapsed(), datasources, totalColdSegments, 
dataSourceWithColdSegments
     );
     if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
       log.info(executionStatsLog);
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index ef1fb1e8edd..8fbc78a7412 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
@@ -1788,7 +1789,7 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
     Assert.assertEquals(existingMetadata.getNumReplicas(), 
currentMetadata.getNumReplicas());
   }
 
-  private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
+  private CoordinatorSegmentMetadataCache 
setupForColdDatasourceSchemaTest(ServiceEmitter emitter)
   {
     // foo has both hot and cold segments
     DataSegment coldSegment =
@@ -1862,7 +1863,7 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         SEGMENT_CACHE_CONFIG_DEFAULT,
         new NoopEscalator(),
         new InternalQueryConfig(),
-        new NoopServiceEmitter(),
+        emitter,
         segmentSchemaCache,
         backFillQueue,
         sqlSegmentsMetadataManager,
@@ -1893,10 +1894,17 @@ public class CoordinatorSegmentMetadataCacheTest 
extends CoordinatorSegmentMetad
   @Test
   public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws 
IOException
   {
-    CoordinatorSegmentMetadataCache schema = 
setupForColdDatasourceSchemaTest();
+    StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
+    CoordinatorSegmentMetadataCache schema = 
setupForColdDatasourceSchemaTest(emitter);
 
     schema.coldDatasourceSchemaExec();
 
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1);
+
     Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), 
schema.getDataSourceInformationMap().keySet());
 
     // verify that cold schema for both foo and cold is present
@@ -1955,7 +1963,8 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   @Test
   public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws 
IOException
   {
-    CoordinatorSegmentMetadataCache schema = 
setupForColdDatasourceSchemaTest();
+    StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
+    CoordinatorSegmentMetadataCache schema = 
setupForColdDatasourceSchemaTest(emitter);
 
     Set<SegmentId> segmentIds = new HashSet<>();
     segmentIds.add(segment1.getId());
@@ -1971,7 +1980,13 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
 
     schema.coldDatasourceSchemaExec();
 
-    // could datasource should be present now
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
+    emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1);
+
+    // cold datasource should be present now
     Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), 
schema.getDataSourceInformationMap().keySet());
 
     RowSignature coldSignature = 
schema.getDatasource("cold").getRowSignature();
@@ -2160,6 +2175,45 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
     Assert.assertEquals(Collections.singleton("beta"), 
schema.getDataSourceInformationMap().keySet());
   }
 
+  @Test
+  public void testColdDatasourceSchemaExecRunsPeriodically() throws 
InterruptedException
+  {
+    // Make sure the thread runs more than once
+    CountDownLatch latch = new CountDownLatch(2);
+
+    CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        segmentSchemaCache,
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
+    ) {
+      @Override
+      long getColdSchemaExecPeriodMillis()
+      {
+        return 10;
+      }
+
+      @Override
+      protected void coldDatasourceSchemaExec()
+      {
+        latch.countDown();
+        super.coldDatasourceSchemaExec();
+      }
+    };
+
+    schema.onLeaderStart();
+    schema.awaitInitialization();
+
+    latch.await(1, TimeUnit.SECONDS);
+    Assert.assertEquals(0, latch.getCount());
+  }
+
   private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int 
columns)
   {
     final DataSourceInformation fooDs = schema.getDatasource("foo");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to