This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a18f582ef0a Skip refresh for unused segments in metadata cache (#16990)
a18f582ef0a is described below
commit a18f582ef0a1ad8ab096a5f261a454374c9fdd0b
Author: Rishabh Singh <[email protected]>
AuthorDate: Thu Sep 12 10:39:59 2024 +0530
Skip refresh for unused segments in metadata cache (#16990)
* Skip refresh for unused segments in metadata cache
* Cover the condition where a used segment missing schema is marked for
refresh
* Fix test
---
.../metadata/CoordinatorSegmentMetadataCache.java | 11 ++-
.../CoordinatorSegmentMetadataCacheTest.java | 96 ++++++++++++++++++++++
2 files changed, 105 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 0ed81f6c1e8..321c33fa1db 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -692,9 +692,16 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
RowSignature rowSignature =
optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
- // mark it for refresh, however, this case shouldn't arise by design
- markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
+
+ ImmutableDruidDataSource druidDataSource =
+
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());
+
+ if (druidDataSource != null && druidDataSource.getSegment(segmentId)
!= null) {
+ // mark it for refresh only if it is used
+ // however, this case shouldn't arise by design
+ markSegmentAsNeedRefresh(segmentId);
+ }
}
}
} else {
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 283c1687a97..0c099cb551c 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -2290,6 +2290,102 @@ public class CoordinatorSegmentMetadataCacheTest
extends CoordinatorSegmentMetad
Assert.assertEquals(0, metadatas.size());
}
+ @Test
+ public void testUnusedSegmentIsNotRefreshed() throws InterruptedException,
IOException
+ {
+ String dataSource = "xyz";
+ CountDownLatch latch = new CountDownLatch(1);
+ CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ segmentSchemaCache,
+ backFillQueue,
+ sqlSegmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
+ ) {
+ @Override
+ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String>
dataSourcesToRebuild)
+ throws IOException
+ {
+ super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+ latch.countDown();
+ }
+ };
+
+ List<DataSegment> segments = ImmutableList.of(
+ newSegment(dataSource, 1),
+ newSegment(dataSource, 2),
+ newSegment(dataSource, 3)
+ );
+
+ final DruidServer historicalServer = druidServers.stream()
+ .filter(s ->
s.getType().equals(ServerType.HISTORICAL))
+ .findAny()
+ .orElse(null);
+
+ Assert.assertNotNull(historicalServer);
+ final DruidServerMetadata historicalServerMetadata =
historicalServer.getMetadata();
+
+ ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new
ImmutableMap.Builder<>();
+ segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L,
"fp"));
+ segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L,
"fp"));
+ segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L,
"fp"));
+
+ ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new
ImmutableMap.Builder<>();
+ schemaPayloadMap.put("fp", new
SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build()));
+ segmentSchemaCache.updateFinalizedSegmentSchema(
+ new
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(),
schemaPayloadMap.build())
+ );
+
+ schema.addSegment(historicalServerMetadata, segments.get(0));
+ schema.addSegment(historicalServerMetadata, segments.get(1));
+ schema.addSegment(historicalServerMetadata, segments.get(2));
+
+ serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
+ serverView.addSegment(segments.get(1), ServerType.HISTORICAL);
+ serverView.addSegment(segments.get(2), ServerType.HISTORICAL);
+
+ schema.onLeaderStart();
+ schema.awaitInitialization();
+
+ Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+ // make segment3 unused
+ segmentStatsMap = new ImmutableMap.Builder<>();
+ segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L,
"fp"));
+
+ segmentSchemaCache.updateFinalizedSegmentSchema(
+ new
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(),
schemaPayloadMap.build())
+ );
+
+ Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
+ segmentMap.put(segments.get(0).getId(), segments.get(0));
+ segmentMap.put(segments.get(1).getId(), segments.get(1));
+
+ ImmutableDruidDataSource druidDataSource =
+ new ImmutableDruidDataSource(
+ "xyz",
+ Collections.emptyMap(),
+ segmentMap
+ );
+
+
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString()))
+ .thenReturn(druidDataSource);
+
+ Set<SegmentId> segmentsToRefresh =
segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
+ segmentsToRefresh.remove(segments.get(1).getId());
+ segmentsToRefresh.remove(segments.get(2).getId());
+
+ schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource));
+
+
Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
+
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
+ }
+
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int
columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]