This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a6ebb963c72 Fix NPE in SegmentSchemaCache (#16404)
a6ebb963c72 is described below
commit a6ebb963c726094f2acee651274176af12a5fe78
Author: Rishabh Singh <[email protected]>
AuthorDate: Thu May 9 11:13:53 2024 +0530
Fix NPE in SegmentSchemaCache (#16404)
Verify that schema backfill count metric is emitted for each datasource.
Fix potential NPE in
SegmentSchemaCache#markMetadataQueryResultPublished.
---
.../metadata/SegmentSchemaBackFillQueue.java | 2 +-
.../druid/segment/metadata/SegmentSchemaCache.java | 8 ++--
.../metadata/SegmentSchemaBackFillQueueTest.java | 54 +++++++++++++++++++---
.../segment/metadata/SegmentSchemaCacheTest.java | 8 +++-
4 files changed, 60 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
index c2995e3087e..66ce9ed4bde 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
@@ -178,7 +178,7 @@ public class SegmentSchemaBackFillQueue
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(),
entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
-
segmentSchemaCache.markInMetadataQueryResultPublished(plus.getSegmentId());
+
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index e2fb1681792..c28e2b693bb 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -173,13 +173,15 @@ public class SegmentSchemaCache
* After, metadata query result is published to the DB, it is removed from
temporaryMetadataQueryResults
* and added to temporaryPublishedMetadataQueryResults.
*/
- public void markInMetadataQueryResultPublished(SegmentId segmentId)
+ public void markMetadataQueryResultPublished(SegmentId segmentId)
{
- if (!temporaryMetadataQueryResults.containsKey(segmentId)) {
+ SchemaPayloadPlus temporaryMetadataQueryResult =
temporaryMetadataQueryResults.get(segmentId);
+ if (temporaryMetadataQueryResult == null) {
log.error("SegmentId [%s] not found in temporaryMetadataQueryResults
map.", segmentId);
+ } else {
+ temporaryPublishedMetadataQueryResults.put(segmentId,
temporaryMetadataQueryResult);
}
- temporaryPublishedMetadataQueryResults.put(segmentId,
temporaryMetadataQueryResults.get(segmentId));
temporaryMetadataQueryResults.remove(segmentId);
}
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
index 238b5acc544..2229525c30a 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java
@@ -26,7 +26,9 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import
org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
@@ -41,7 +43,9 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
public class SegmentSchemaBackFillQueueTest
@@ -51,7 +55,8 @@ public class SegmentSchemaBackFillQueueTest
}
@Rule
- public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new
TestDerbyConnector.DerbyConnectorRule(getEnabledConfig());
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule =
+ new TestDerbyConnector.DerbyConnectorRule(getEnabledConfig());
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
@@ -78,13 +83,15 @@ public class SegmentSchemaBackFillQueueTest
CountDownLatch latch = new CountDownLatch(1);
+ StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
+
SegmentSchemaBackFillQueue segmentSchemaBackFillQueue =
new SegmentSchemaBackFillQueue(
segmentSchemaManager,
ScheduledExecutors::fixed,
segmentSchemaCache,
new FingerprintGenerator(mapper),
- new NoopServiceEmitter(),
+ emitter,
config
) {
@Override
@@ -95,7 +102,7 @@ public class SegmentSchemaBackFillQueueTest
}
};
- final DataSegment segment = new DataSegment(
+ final DataSegment segment1 = new DataSegment(
"foo",
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-01",
@@ -106,18 +113,53 @@ public class SegmentSchemaBackFillQueueTest
9,
100
);
- segmentSchemaTestUtils.insertUsedSegments(Collections.singleton(segment),
Collections.emptyMap());
+
+ final DataSegment segment2 = new DataSegment(
+ "foo",
+ Intervals.of("2023-01-02/2023-01-03"),
+ "2023-02-01",
+ ImmutableMap.of("path", "a-1"),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+
+ final DataSegment segment3 = new DataSegment(
+ "foo1",
+ Intervals.of("2023-01-01/2023-01-02"),
+ "2023-01-01",
+ ImmutableMap.of("path", "a-1"),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+ Set<DataSegment> segments = new HashSet<>();
+ segments.add(segment1);
+ segments.add(segment2);
+ segments.add(segment3);
+ segmentSchemaTestUtils.insertUsedSegments(segments,
Collections.emptyMap());
final Map<String, Pair<SchemaPayload, Integer>> segmentIdSchemaMap = new
HashMap<>();
RowSignature rowSignature = RowSignature.builder().add("cx",
ColumnType.FLOAT).build();
Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new
LongFirstAggregatorFactory("longFirst", "long-col", null));
- segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
- segmentSchemaBackFillQueue.add(segment.getId(), rowSignature,
aggregatorFactoryMap, 20);
+ segmentIdSchemaMap.put(segment1.getId().toString(), Pair.of(new
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
+ segmentIdSchemaMap.put(segment2.getId().toString(), Pair.of(new
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
+ segmentIdSchemaMap.put(segment3.getId().toString(), Pair.of(new
SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
+
+ segmentSchemaBackFillQueue.add(segment1.getId(), rowSignature,
aggregatorFactoryMap, 20);
+ segmentSchemaBackFillQueue.add(segment2.getId(), rowSignature,
aggregatorFactoryMap, 20);
+ segmentSchemaBackFillQueue.add(segment3.getId(), rowSignature,
aggregatorFactoryMap, 20);
segmentSchemaBackFillQueue.onLeaderStart();
latch.await();
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
+ emitter.verifyValue("metadatacache/backfill/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 2);
+ emitter.verifyValue("metadatacache/backfill/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, "foo1"), 1);
}
private CentralizedDatasourceSchemaConfig getEnabledConfig()
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
index 234b16bd9b5..f89c305b9db 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
@@ -56,13 +56,17 @@ public class SegmentSchemaCacheTest
}
@Test
- public void testCacheInTransitSMQResult()
+ public void testCacheTemporaryMetadataQueryResults()
{
SegmentSchemaCache cache = new SegmentSchemaCache(new
NoopServiceEmitter());
RowSignature rowSignature = RowSignature.builder().add("cx",
ColumnType.FLOAT).build();
SchemaPayloadPlus expected = new SchemaPayloadPlus(new
SchemaPayload(rowSignature, Collections.emptyMap()), 20L);
SegmentId id = SegmentId.dummy("ds");
+
+ // this call shouldn't result in any error
+ cache.markMetadataQueryResultPublished(id);
+
cache.addTemporaryMetadataQueryResult(id, rowSignature,
Collections.emptyMap(), 20);
Assert.assertTrue(cache.isSchemaCached(id));
@@ -70,7 +74,7 @@ public class SegmentSchemaCacheTest
Assert.assertTrue(schema.isPresent());
Assert.assertEquals(expected, schema.get());
- cache.markInMetadataQueryResultPublished(id);
+ cache.markMetadataQueryResultPublished(id);
schema = cache.getSchemaForSegment(id);
Assert.assertTrue(schema.isPresent());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]