This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a18f582ef0a Skip refresh for unused segments in metadata cache (#16990)
a18f582ef0a is described below

commit a18f582ef0a1ad8ab096a5f261a454374c9fdd0b
Author: Rishabh Singh <[email protected]>
AuthorDate: Thu Sep 12 10:39:59 2024 +0530

    Skip refresh for unused segments in metadata cache (#16990)
    
    * Skip refresh for unused segments in metadata cache
    
    * Cover the condition where a used segment missing schema is marked for 
refresh
    
    * Fix test
---
 .../metadata/CoordinatorSegmentMetadataCache.java  | 11 ++-
 .../CoordinatorSegmentMetadataCacheTest.java       | 96 ++++++++++++++++++++++
 2 files changed, 105 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 0ed81f6c1e8..321c33fa1db 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -692,9 +692,16 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
           RowSignature rowSignature = 
optionalSchema.get().getSchemaPayload().getRowSignature();
           mergeRowSignature(columnTypes, rowSignature);
         } else {
-          // mark it for refresh, however, this case shouldn't arise by design
-          markSegmentAsNeedRefresh(segmentId);
           log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
+
+          ImmutableDruidDataSource druidDataSource =
+              
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());
+
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) 
!= null) {
+            // mark it for refresh only if it is used
+            // however, this case shouldn't arise by design
+            markSegmentAsNeedRefresh(segmentId);
+          }
         }
       }
     } else {
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 283c1687a97..0c099cb551c 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -2290,6 +2290,102 @@ public class CoordinatorSegmentMetadataCacheTest 
extends CoordinatorSegmentMetad
     Assert.assertEquals(0, metadatas.size());
   }
 
+  @Test
+  public void testUnusedSegmentIsNotRefreshed() throws InterruptedException, 
IOException
+  {
+    String dataSource = "xyz";
+    CountDownLatch latch = new CountDownLatch(1);
+    CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        segmentSchemaCache,
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
+    ) {
+      @Override
+      public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> 
dataSourcesToRebuild)
+          throws IOException
+      {
+        super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+        latch.countDown();
+      }
+    };
+
+    List<DataSegment> segments = ImmutableList.of(
+        newSegment(dataSource, 1),
+        newSegment(dataSource, 2),
+        newSegment(dataSource, 3)
+    );
+
+    final DruidServer historicalServer = druidServers.stream()
+                                                     .filter(s -> 
s.getType().equals(ServerType.HISTORICAL))
+                                                     .findAny()
+                                                     .orElse(null);
+
+    Assert.assertNotNull(historicalServer);
+    final DruidServerMetadata historicalServerMetadata = 
historicalServer.getMetadata();
+
+    ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new 
ImmutableMap.Builder<>();
+    segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, 
"fp"));
+    segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L, 
"fp"));
+    segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L, 
"fp"));
+
+    ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new 
ImmutableMap.Builder<>();
+    schemaPayloadMap.put("fp", new 
SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build()));
+    segmentSchemaCache.updateFinalizedSegmentSchema(
+        new 
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), 
schemaPayloadMap.build())
+    );
+
+    schema.addSegment(historicalServerMetadata, segments.get(0));
+    schema.addSegment(historicalServerMetadata, segments.get(1));
+    schema.addSegment(historicalServerMetadata, segments.get(2));
+
+    serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
+    serverView.addSegment(segments.get(1), ServerType.HISTORICAL);
+    serverView.addSegment(segments.get(2), ServerType.HISTORICAL);
+
+    schema.onLeaderStart();
+    schema.awaitInitialization();
+
+    Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+    // make segment3 unused
+    segmentStatsMap = new ImmutableMap.Builder<>();
+    segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, 
"fp"));
+
+    segmentSchemaCache.updateFinalizedSegmentSchema(
+        new 
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), 
schemaPayloadMap.build())
+    );
+
+    Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
+    segmentMap.put(segments.get(0).getId(), segments.get(0));
+    segmentMap.put(segments.get(1).getId(), segments.get(1));
+
+    ImmutableDruidDataSource druidDataSource =
+        new ImmutableDruidDataSource(
+            "xyz",
+            Collections.emptyMap(),
+            segmentMap
+        );
+
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString()))
+           .thenReturn(druidDataSource);
+
+    Set<SegmentId> segmentsToRefresh = 
segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
+    segmentsToRefresh.remove(segments.get(1).getId());
+    segmentsToRefresh.remove(segments.get(2).getId());
+
+    schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource));
+
+    
Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
+  }
+
   private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int 
columns)
   {
     final DataSourceInformation fooDs = schema.getDatasource("foo");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to