abhishekagarwal87 commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1349647983
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ /**
+ * Allocates and returns any extra versions that need to be committed for the
+ * given append segments.
+ * <p>
+ * This is typically needed when a REPLACE task started and finished after
+ * these append segments had already been allocated. As such,
+ * there would be some used segments in the DB with versions higher than
these
+ * append segments.
+ */
+ private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+ Handle handle,
+ String dataSource,
+ Set<DataSegment> segmentsToAppend
+ ) throws IOException
+ {
+ if (segmentsToAppend.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ final Set<Interval> appendIntervals = new HashSet<>();
+ final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new
TreeMap<>();
+ for (DataSegment segment : segmentsToAppend) {
+ appendIntervals.add(segment.getInterval());
+ appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new
HashSet<>())
+ .add(segment);
+ }
+
+ // Fetch all used segments that overlap with any of the append intervals
+ final Collection<DataSegment> overlappingSegments =
retrieveUsedSegmentsForIntervals(
+ dataSource,
+ new ArrayList<>(appendIntervals),
+ Segments.INCLUDING_OVERSHADOWED
+ );
+
+ final Map<String, Set<Interval>> committedVersionToIntervals = new
HashMap<>();
+ final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new
HashMap<>();
+ for (DataSegment segment : overlappingSegments) {
+ committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
+ .add(segment.getInterval());
+ committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i ->
new HashSet<>())
+ .add(segment);
+ }
+
+ final Set<DataSegment> upgradedSegments = new HashSet<>();
+ for (Map.Entry<String, Set<Interval>> entry :
committedVersionToIntervals.entrySet()) {
+ final String upgradeVersion = entry.getKey();
+ Map<Interval, Set<DataSegment>> segmentsToUpgrade =
getSegmentsWithVersionLowerThan(
Review Comment:
never mind. I was asking that versions in the db will always be higher or
equal to the append segment version
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]