abhishekagarwal87 commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1349505977
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ /**
+ * Allocates and returns any extra versions that need to be committed for the
+ * given append segments.
+ * <p>
+ * This is typically needed when a REPLACE task started and finished after
+ * these append segments had already been allocated. As such,
+ * there would be some used segments in the DB with versions higher than
these
+ * append segments.
+ */
+ private Set<DataSegment> getSegmentsToUpgradeOnAppend(
Review Comment:
could be renamed to getExtraVersionsForAppendSegments
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ /**
+ * Allocates and returns any extra versions that need to be committed for the
+ * given append segments.
+ * <p>
+ * This is typically needed when a REPLACE task started and finished after
+ * these append segments had already been allocated. As such,
+ * there would be some used segments in the DB with versions higher than
these
+ * append segments.
+ */
+ private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+ Handle handle,
+ String dataSource,
+ Set<DataSegment> segmentsToAppend
+ ) throws IOException
+ {
+ if (segmentsToAppend.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ final Set<Interval> appendIntervals = new HashSet<>();
+ final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new
TreeMap<>();
+ for (DataSegment segment : segmentsToAppend) {
+ appendIntervals.add(segment.getInterval());
+ appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new
HashSet<>())
+ .add(segment);
+ }
+
+ // Fetch all used segments that overlap with any of the append intervals
+ final Collection<DataSegment> overlappingSegments =
retrieveUsedSegmentsForIntervals(
+ dataSource,
+ new ArrayList<>(appendIntervals),
+ Segments.INCLUDING_OVERSHADOWED
+ );
+
+ final Map<String, Set<Interval>> committedVersionToIntervals = new
HashMap<>();
+ final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new
HashMap<>();
Review Comment:
variable naming - committed -> overlapping
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ /**
+ * Allocates and returns any extra versions that need to be committed for the
+ * given append segments.
+ * <p>
+ * This is typically needed when a REPLACE task started and finished after
+ * these append segments had already been allocated. As such,
+ * there would be some used segments in the DB with versions higher than
these
+ * append segments.
+ */
+ private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+ Handle handle,
+ String dataSource,
+ Set<DataSegment> segmentsToAppend
+ ) throws IOException
+ {
+ if (segmentsToAppend.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ final Set<Interval> appendIntervals = new HashSet<>();
+ final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new
TreeMap<>();
+ for (DataSegment segment : segmentsToAppend) {
+ appendIntervals.add(segment.getInterval());
+ appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new
HashSet<>())
+ .add(segment);
+ }
+
+ // Fetch all used segments that overlap with any of the append intervals
+ final Collection<DataSegment> overlappingSegments =
retrieveUsedSegmentsForIntervals(
+ dataSource,
+ new ArrayList<>(appendIntervals),
+ Segments.INCLUDING_OVERSHADOWED
+ );
+
+ final Map<String, Set<Interval>> committedVersionToIntervals = new
HashMap<>();
+ final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new
HashMap<>();
+ for (DataSegment segment : overlappingSegments) {
+ committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
+ .add(segment.getInterval());
+ committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i ->
new HashSet<>())
+ .add(segment);
+ }
+
+ final Set<DataSegment> upgradedSegments = new HashSet<>();
+ for (Map.Entry<String, Set<Interval>> entry :
committedVersionToIntervals.entrySet()) {
+ final String upgradeVersion = entry.getKey();
+ Map<Interval, Set<DataSegment>> segmentsToUpgrade =
getSegmentsWithVersionLowerThan(
Review Comment:
just for my own understanding, there should never be an empty value for a
function such as `getSegmentsWithVersionHigherThan` with the same arguments?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ /**
+ * Allocates and returns any extra versions that need to be committed for the
+ * given append segments.
+ * <p>
+ * This is typically needed when a REPLACE task started and finished after
+ * these append segments had already been allocated. As such,
+ * there would be some used segments in the DB with versions higher than
these
+ * append segments.
+ */
+ private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+ Handle handle,
+ String dataSource,
+ Set<DataSegment> segmentsToAppend
+ ) throws IOException
+ {
+ if (segmentsToAppend.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ final Set<Interval> appendIntervals = new HashSet<>();
+ final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new
TreeMap<>();
+ for (DataSegment segment : segmentsToAppend) {
+ appendIntervals.add(segment.getInterval());
+ appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new
HashSet<>())
+ .add(segment);
+ }
+
+ // Fetch all used segments that overlap with any of the append intervals
+ final Collection<DataSegment> overlappingSegments =
retrieveUsedSegmentsForIntervals(
+ dataSource,
+ new ArrayList<>(appendIntervals),
+ Segments.INCLUDING_OVERSHADOWED
+ );
+
+ final Map<String, Set<Interval>> committedVersionToIntervals = new
HashMap<>();
+ final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new
HashMap<>();
+ for (DataSegment segment : overlappingSegments) {
+ committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
+ .add(segment.getInterval());
+ committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i ->
new HashSet<>())
+ .add(segment);
+ }
+
+ final Set<DataSegment> upgradedSegments = new HashSet<>();
+ for (Map.Entry<String, Set<Interval>> entry :
committedVersionToIntervals.entrySet()) {
+ final String upgradeVersion = entry.getKey();
+ Map<Interval, Set<DataSegment>> segmentsToUpgrade =
getSegmentsWithVersionLowerThan(
Review Comment:
naming -segmentsToUpgrade --> extraSegmentVersions
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -977,6 +1045,175 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ /**
+ * Allocates and returns any extra versions that need to be committed for the
+ * given append segments.
+ * <p>
+ * This is typically needed when a REPLACE task started and finished after
+ * these append segments had already been allocated. As such,
+ * there would be some used segments in the DB with versions higher than
these
+ * append segments.
+ */
+ private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+ Handle handle,
+ String dataSource,
+ Set<DataSegment> segmentsToAppend
+ ) throws IOException
+ {
+ if (segmentsToAppend.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ final Set<Interval> appendIntervals = new HashSet<>();
+ final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new
TreeMap<>();
+ for (DataSegment segment : segmentsToAppend) {
+ appendIntervals.add(segment.getInterval());
+ appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new
HashSet<>())
+ .add(segment);
+ }
+
+ // Fetch all used segments that overlap with any of the append intervals
+ final Collection<DataSegment> overlappingSegments =
retrieveUsedSegmentsForIntervals(
+ dataSource,
+ new ArrayList<>(appendIntervals),
+ Segments.INCLUDING_OVERSHADOWED
+ );
+
+ final Map<String, Set<Interval>> committedVersionToIntervals = new
HashMap<>();
+ final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new
HashMap<>();
+ for (DataSegment segment : overlappingSegments) {
+ committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
+ .add(segment.getInterval());
+ committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i ->
new HashSet<>())
+ .add(segment);
+ }
+
+ final Set<DataSegment> upgradedSegments = new HashSet<>();
+ for (Map.Entry<String, Set<Interval>> entry :
committedVersionToIntervals.entrySet()) {
+ final String upgradeVersion = entry.getKey();
+ Map<Interval, Set<DataSegment>> segmentsToUpgrade =
getSegmentsWithVersionLowerThan(
+ upgradeVersion,
+ entry.getValue(),
+ appendVersionToSegments
+ );
+ for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry :
segmentsToUpgrade.entrySet()) {
+ Set<DataSegment> segmentsUpgradedToVersion = upgradeSegmentsToVersion(
+ handle,
+ upgradeVersion,
+ upgradeEntry.getKey(),
+ upgradeEntry.getValue(),
+ committedIntervalToSegments
+ );
+ log.info("Upgraded [%d] segments to version[%s].",
segmentsUpgradedToVersion.size(), upgradeVersion);
+ upgradedSegments.addAll(segmentsUpgradedToVersion);
+ }
+ }
+
+ return upgradedSegments;
+ }
+
+ /**
+ * Creates a Map from eligible interval to Set of segments that are fully
+ * contained in that interval and have a version strictly lower than {@code
#cutoffVersion}.
+ */
+ private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(
+ String cutoffVersion,
+ Set<Interval> eligibleIntervals,
+ TreeMap<String, Set<DataSegment>> versionToSegments
+ )
+ {
+ final Set<DataSegment> eligibleSegments
+ = versionToSegments.headMap(cutoffVersion).values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+
+ final Map<Interval, Set<DataSegment>> eligibleIntervalToSegments = new
HashMap<>();
+
+ for (DataSegment segment : eligibleSegments) {
+ final Interval segmentInterval = segment.getInterval();
+ for (Interval eligibleInterval : eligibleIntervals) {
+ if (eligibleInterval.contains(segmentInterval)) {
+ eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl ->
new HashSet<>())
+ .add(segment);
+ break;
+ } else if (eligibleInterval.overlaps(segmentInterval)) {
+ // Committed interval overlaps only partially
+ throw new ISE(
+ "Committed interval[%s] conflicts with interval[%s] of append
segment[%s].",
+ eligibleInterval, segmentInterval, segment.getId()
+ );
+ }
+ }
+ }
+
+ return eligibleIntervalToSegments;
+ }
+
+ /**
+ * Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded
+ * to the given {@code upgradeVersion}.
+ */
+ private Set<DataSegment> upgradeSegmentsToVersion(
Review Comment:
the name is a bit confusing because it doesn't actually upgrade anything
yet. Just creating extra versions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]