This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a6ebb963c72 Fix NPE in SegmentSchemaCache (#16404)
a6ebb963c72 is described below

commit a6ebb963c726094f2acee651274176af12a5fe78
Author: Rishabh Singh <[email protected]>
AuthorDate: Thu May 9 11:13:53 2024 +0530

    Fix NPE in SegmentSchemaCache (#16404)
    
    
    
        Verify that schema backfill count metric is emitted for each datasource.
        Fix potential NPE in 
SegmentSchemaCache#markMetadataQueryResultPublished.
---
 .../metadata/SegmentSchemaBackFillQueue.java       |  2 +-
 .../druid/segment/metadata/SegmentSchemaCache.java |  8 ++--
 .../metadata/SegmentSchemaBackFillQueueTest.java   | 54 +++++++++++++++++++---
 .../segment/metadata/SegmentSchemaCacheTest.java   |  8 +++-
 4 files changed, 60 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
index c2995e3087e..66ce9ed4bde 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
@@ -178,7 +178,7 @@ public class SegmentSchemaBackFillQueue
         
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), 
entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
         // Mark the segments as published in the cache.
         for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
-          
segmentSchemaCache.markInMetadataQueryResultPublished(plus.getSegmentId());
+          
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
         }
         emitter.emit(
             ServiceMetricEvent.builder()
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index e2fb1681792..c28e2b693bb 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -173,13 +173,15 @@ public class SegmentSchemaCache
    * After, metadata query result is published to the DB, it is removed from 
temporaryMetadataQueryResults
    * and added to temporaryPublishedMetadataQueryResults.
    */
-  public void markInMetadataQueryResultPublished(SegmentId segmentId)
+  public void markMetadataQueryResultPublished(SegmentId segmentId)
   {
-    if (!temporaryMetadataQueryResults.containsKey(segmentId)) {
+    SchemaPayloadPlus temporaryMetadataQueryResult = 
temporaryMetadataQueryResults.get(segmentId);
+    if (temporaryMetadataQueryResult == null) {
       log.error("SegmentId [%s] not found in temporaryMetadataQueryResults 
map.", segmentId);
+    } else {
+      temporaryPublishedMetadataQueryResults.put(segmentId, 
temporaryMetadataQueryResult);
     }
 
-    temporaryPublishedMetadataQueryResults.put(segmentId, 
temporaryMetadataQueryResults.get(segmentId));
     temporaryMetadataQueryResults.remove(segmentId);
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
index 238b5acc544..2229525c30a 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
@@ -26,7 +26,9 @@ import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import 
org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory;
 import org.apache.druid.segment.SchemaPayload;
@@ -41,7 +43,9 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
 public class SegmentSchemaBackFillQueueTest
@@ -51,7 +55,8 @@ public class SegmentSchemaBackFillQueueTest
   }
 
   @Rule
-  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new 
TestDerbyConnector.DerbyConnectorRule(getEnabledConfig());
+  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule =
+      new TestDerbyConnector.DerbyConnectorRule(getEnabledConfig());
 
   private final ObjectMapper mapper = TestHelper.makeJsonMapper();
 
@@ -78,13 +83,15 @@ public class SegmentSchemaBackFillQueueTest
 
     CountDownLatch latch = new CountDownLatch(1);
 
+    StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
+
     SegmentSchemaBackFillQueue segmentSchemaBackFillQueue =
         new SegmentSchemaBackFillQueue(
             segmentSchemaManager,
             ScheduledExecutors::fixed,
             segmentSchemaCache,
             new FingerprintGenerator(mapper),
-            new NoopServiceEmitter(),
+            emitter,
             config
         ) {
           @Override
@@ -95,7 +102,7 @@ public class SegmentSchemaBackFillQueueTest
           }
         };
 
-    final DataSegment segment = new DataSegment(
+    final DataSegment segment1 = new DataSegment(
         "foo",
         Intervals.of("2023-01-01/2023-01-02"),
         "2023-01-01",
@@ -106,18 +113,53 @@ public class SegmentSchemaBackFillQueueTest
         9,
         100
     );
-    segmentSchemaTestUtils.insertUsedSegments(Collections.singleton(segment), 
Collections.emptyMap());
+
+    final DataSegment segment2 = new DataSegment(
+        "foo",
+        Intervals.of("2023-01-02/2023-01-03"),
+        "2023-02-01",
+        ImmutableMap.of("path", "a-1"),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(0),
+        9,
+        100
+    );
+
+    final DataSegment segment3 = new DataSegment(
+        "foo1",
+        Intervals.of("2023-01-01/2023-01-02"),
+        "2023-01-01",
+        ImmutableMap.of("path", "a-1"),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(0),
+        9,
+        100
+    );
+    Set<DataSegment> segments = new HashSet<>();
+    segments.add(segment1);
+    segments.add(segment2);
+    segments.add(segment3);
+    segmentSchemaTestUtils.insertUsedSegments(segments, 
Collections.emptyMap());
 
     final Map<String, Pair<SchemaPayload, Integer>> segmentIdSchemaMap = new 
HashMap<>();
     RowSignature rowSignature = RowSignature.builder().add("cx", 
ColumnType.FLOAT).build();
     Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
     aggregatorFactoryMap.put("longFirst", new 
LongFirstAggregatorFactory("longFirst", "long-col", null));
 
-    segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new 
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
-    segmentSchemaBackFillQueue.add(segment.getId(), rowSignature, 
aggregatorFactoryMap, 20);
+    segmentIdSchemaMap.put(segment1.getId().toString(), Pair.of(new 
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
+    segmentIdSchemaMap.put(segment2.getId().toString(), Pair.of(new 
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
+    segmentIdSchemaMap.put(segment3.getId().toString(), Pair.of(new 
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
+
+    segmentSchemaBackFillQueue.add(segment1.getId(), rowSignature, 
aggregatorFactoryMap, 20);
+    segmentSchemaBackFillQueue.add(segment2.getId(), rowSignature, 
aggregatorFactoryMap, 20);
+    segmentSchemaBackFillQueue.add(segment3.getId(), rowSignature, 
aggregatorFactoryMap, 20);
     segmentSchemaBackFillQueue.onLeaderStart();
     latch.await();
     segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
+    emitter.verifyValue("metadatacache/backfill/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 2);
+    emitter.verifyValue("metadatacache/backfill/count", 
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo1"), 1);
   }
 
   private CentralizedDatasourceSchemaConfig getEnabledConfig()
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
index 234b16bd9b5..f89c305b9db 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
@@ -56,13 +56,17 @@ public class SegmentSchemaCacheTest
   }
 
   @Test
-  public void testCacheInTransitSMQResult()
+  public void testCacheTemporaryMetadataQueryResults()
   {
     SegmentSchemaCache cache = new SegmentSchemaCache(new 
NoopServiceEmitter());
 
     RowSignature rowSignature = RowSignature.builder().add("cx", 
ColumnType.FLOAT).build();
     SchemaPayloadPlus expected = new SchemaPayloadPlus(new 
SchemaPayload(rowSignature, Collections.emptyMap()), 20L);
     SegmentId id = SegmentId.dummy("ds");
+
+    // this call shouldn't result in any error
+    cache.markMetadataQueryResultPublished(id);
+
     cache.addTemporaryMetadataQueryResult(id, rowSignature, 
Collections.emptyMap(), 20);
 
     Assert.assertTrue(cache.isSchemaCached(id));
@@ -70,7 +74,7 @@ public class SegmentSchemaCacheTest
     Assert.assertTrue(schema.isPresent());
     Assert.assertEquals(expected, schema.get());
 
-    cache.markInMetadataQueryResultPublished(id);
+    cache.markMetadataQueryResultPublished(id);
 
     schema = cache.getSchemaForSegment(id);
     Assert.assertTrue(schema.isPresent());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to