kfaraz commented on code in PR #15039:
URL: https://github.com/apache/druid/pull/15039#discussion_r1354342661


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -601,6 +599,157 @@ public SegmentIdWithShardSpec allocatePendingSegment(
     );
   }
 
+  @Override
+  public Set<SegmentIdWithShardSpec> upgradePendingSegments(Set<DataSegment> 
replaceSegments)
+  {
+    if (replaceSegments.isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    // Any replace interval has exactly one version of segments
+    final Map<Interval, DataSegment> replaceIntervalToMaxId = new HashMap<>();
+    for (DataSegment segment : replaceSegments) {
+      DataSegment committedMaxId = 
replaceIntervalToMaxId.get(segment.getInterval());
+      if (committedMaxId == null
+          || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+        replaceIntervalToMaxId.put(segment.getInterval(), segment);
+      }
+    }
+
+    final String datasource = 
replaceSegments.iterator().next().getDataSource();
+    return connector.retryWithHandle(
+        handle -> upgradePendingSegments(handle, datasource, 
replaceIntervalToMaxId)
+    );
+  }
+
+  @Override
+  public Set<SegmentIdWithShardSpec> 
findAllVersionsOfPendingSegment(SegmentIdWithShardSpec pendingSegment)
+  {
+    return connector.retryWithHandle(
+        handle -> findAllVersionsOfPendingSegment(handle, pendingSegment)
+    );
+  }
+
+  private Set<SegmentIdWithShardSpec> findAllVersionsOfPendingSegment(
+      Handle handle,
+      SegmentIdWithShardSpec pendingSegment
+  ) throws IOException
+  {
+    final Interval interval = pendingSegment.getInterval();
+    final Query<Map<String, Object>> query = handle
+        .createQuery(
+            StringUtils.format(
+                "SELECT payload "
+                + "FROM %s WHERE "
+                + "dataSource = :dataSource AND "
+                + "start = :start AND "
+                + "%2$send%2$s = :end AND "
+                + "sequence_prev_id = :sequence_prev_id",
+                dbTables.getPendingSegmentsTable(),
+                connector.getQuoteString()
+            )
+        )
+        .bind("dataSource", pendingSegment.getDataSource())
+        .bind("sequence_prev_id", pendingSegment.asSegmentId().toString())
+        .bind("start", interval.getStart().toString())
+        .bind("end", interval.getEnd().toString());
+
+    final ResultIterator<byte[]> dbSegments = query
+        .map(ByteArrayMapper.FIRST)
+        .iterator();
+
+    final Set<SegmentIdWithShardSpec> allVersions = new HashSet<>();
+    while (dbSegments.hasNext()) {
+      final byte[] payload = dbSegments.next();
+      final SegmentIdWithShardSpec segmentId =
+          jsonMapper.readValue(payload, SegmentIdWithShardSpec.class);
+      allVersions.add(segmentId);
+    }
+
+    return allVersions;
+  }
+
+  /**
+   * Finds pending segments contained in each replace interval and upgrades 
them
+   * to the replace version.
+   */
+  private Set<SegmentIdWithShardSpec> upgradePendingSegments(
+      Handle handle,
+      String datasource,
+      Map<Interval, DataSegment> replaceIntervalToMaxId
+  ) throws IOException
+  {
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> 
newPendingSegmentVersions = new HashMap<>();
+
+    for (Map.Entry<Interval, DataSegment> entry : 
replaceIntervalToMaxId.entrySet()) {
+      final Interval replaceInterval = entry.getKey();
+      final DataSegment maxSegmentId = entry.getValue();
+      final String replaceVersion = maxSegmentId.getVersion();
+
+      final int numCorePartitions = 
maxSegmentId.getShardSpec().getNumCorePartitions();
+      int currentPartitionNumber = 
maxSegmentId.getShardSpec().getPartitionNum();
+
+      final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
+          = getPendingSegmentsForIntervalWithHandle(handle, datasource, 
replaceInterval);
+
+      for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
+          : overlappingPendingSegments.entrySet()) {
+        final SegmentIdWithShardSpec pendingSegmentId = 
overlappingPendingSegment.getKey();
+        final String pendingSegmentSequence = 
overlappingPendingSegment.getValue();
+        if (shouldUpgradePendingSegment(pendingSegmentId, 
pendingSegmentSequence, replaceInterval, replaceVersion)) {
+          // There cannot be any duplicates because this version not been 
committed before
+          newPendingSegmentVersions.put(
+              new SegmentCreateRequest(
+                  UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,

Review Comment:
   No, that wouldn't work because the combo of sequence_name and 
prev_segment_id have to be unique. So we want all the versions of a given 
pending segment to have the same prev_segment_id but different sequence_names.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -601,6 +599,157 @@ public SegmentIdWithShardSpec allocatePendingSegment(
     );
   }
 
+  @Override
+  public Set<SegmentIdWithShardSpec> upgradePendingSegments(Set<DataSegment> 
replaceSegments)
+  {
+    if (replaceSegments.isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    // Any replace interval has exactly one version of segments
+    final Map<Interval, DataSegment> replaceIntervalToMaxId = new HashMap<>();
+    for (DataSegment segment : replaceSegments) {
+      DataSegment committedMaxId = 
replaceIntervalToMaxId.get(segment.getInterval());
+      if (committedMaxId == null
+          || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+        replaceIntervalToMaxId.put(segment.getInterval(), segment);
+      }
+    }
+
+    final String datasource = 
replaceSegments.iterator().next().getDataSource();
+    return connector.retryWithHandle(
+        handle -> upgradePendingSegments(handle, datasource, 
replaceIntervalToMaxId)
+    );
+  }
+
+  @Override
+  public Set<SegmentIdWithShardSpec> 
findAllVersionsOfPendingSegment(SegmentIdWithShardSpec pendingSegment)
+  {
+    return connector.retryWithHandle(
+        handle -> findAllVersionsOfPendingSegment(handle, pendingSegment)
+    );
+  }
+
+  private Set<SegmentIdWithShardSpec> findAllVersionsOfPendingSegment(
+      Handle handle,
+      SegmentIdWithShardSpec pendingSegment
+  ) throws IOException
+  {
+    final Interval interval = pendingSegment.getInterval();
+    final Query<Map<String, Object>> query = handle
+        .createQuery(
+            StringUtils.format(
+                "SELECT payload "
+                + "FROM %s WHERE "
+                + "dataSource = :dataSource AND "
+                + "start = :start AND "
+                + "%2$send%2$s = :end AND "
+                + "sequence_prev_id = :sequence_prev_id",
+                dbTables.getPendingSegmentsTable(),
+                connector.getQuoteString()
+            )
+        )
+        .bind("dataSource", pendingSegment.getDataSource())
+        .bind("sequence_prev_id", pendingSegment.asSegmentId().toString())
+        .bind("start", interval.getStart().toString())
+        .bind("end", interval.getEnd().toString());
+
+    final ResultIterator<byte[]> dbSegments = query
+        .map(ByteArrayMapper.FIRST)
+        .iterator();
+
+    final Set<SegmentIdWithShardSpec> allVersions = new HashSet<>();
+    while (dbSegments.hasNext()) {
+      final byte[] payload = dbSegments.next();
+      final SegmentIdWithShardSpec segmentId =
+          jsonMapper.readValue(payload, SegmentIdWithShardSpec.class);
+      allVersions.add(segmentId);
+    }
+
+    return allVersions;
+  }
+
+  /**
+   * Finds pending segments contained in each replace interval and upgrades 
them
+   * to the replace version.
+   */
+  private Set<SegmentIdWithShardSpec> upgradePendingSegments(
+      Handle handle,
+      String datasource,
+      Map<Interval, DataSegment> replaceIntervalToMaxId
+  ) throws IOException
+  {
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> 
newPendingSegmentVersions = new HashMap<>();
+
+    for (Map.Entry<Interval, DataSegment> entry : 
replaceIntervalToMaxId.entrySet()) {
+      final Interval replaceInterval = entry.getKey();
+      final DataSegment maxSegmentId = entry.getValue();
+      final String replaceVersion = maxSegmentId.getVersion();
+
+      final int numCorePartitions = 
maxSegmentId.getShardSpec().getNumCorePartitions();
+      int currentPartitionNumber = 
maxSegmentId.getShardSpec().getPartitionNum();
+
+      final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
+          = getPendingSegmentsForIntervalWithHandle(handle, datasource, 
replaceInterval);
+
+      for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
+          : overlappingPendingSegments.entrySet()) {
+        final SegmentIdWithShardSpec pendingSegmentId = 
overlappingPendingSegment.getKey();
+        final String pendingSegmentSequence = 
overlappingPendingSegment.getValue();
+        if (shouldUpgradePendingSegment(pendingSegmentId, 
pendingSegmentSequence, replaceInterval, replaceVersion)) {
+          // There cannot be any duplicates because this version not been 
committed before
+          newPendingSegmentVersions.put(
+              new SegmentCreateRequest(
+                  UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,

Review Comment:
   No, that wouldn't work because the combo of sequence_name and 
prev_segment_id has to be unique. So we want all the versions of a given 
pending segment to have the same prev_segment_id but different sequence_names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to