abhishekrb19 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3418423881
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -693,6 +735,27 @@ public void run()
);
if (addResult.isOk()) {
+ // Accumulate observed dimension values per segment for
StreamRangeShardSpec at publish time.
+ if (!partitionDimensions.isEmpty()) {
+ final SegmentId segmentId =
addResult.getSegmentIdentifier().asSegmentId();
+ final Map<String, Set<String>> segValues =
observedPartitionDimValuesBySegment
+ .computeIfAbsent(segmentId, k -> new
ConcurrentHashMap<>());
+ for (String dim : partitionDimensions) {
+ final Set<String> dimSet = segValues.computeIfAbsent(
+ dim,
+ k -> Collections.synchronizedSet(new HashSet<>())
+ );
+ // Empty getDimension result means a null/missing value;
record null so IS NULL is not pruned
+ // (distinct from "", which getDimension returns as [""
]).
+ final List<String> dimValues = row.getDimension(dim);
+ if (dimValues == null || dimValues.isEmpty()) {
+ dimSet.add(null);
+ } else {
+ dimSet.addAll(dimValues);
+ }
+ }
+ }
Review Comment:
Yeah agreed, I think that should be doable. We could spec out the specifics
of the interface contract sometime. Thanks for taking a look!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]