kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1566658230


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -115,6 +127,13 @@ public TypeReference<SegmentPublishResult> 
getReturnTypeReference()
   @Override
   public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
   {
+    if (!(task instanceof PendingSegmentAllocatingTask)) {
+      throw new IAE(

Review Comment:
   Use `DruidException` instead.



##########
server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java:
##########
@@ -32,6 +30,15 @@
 import javax.annotation.Nullable;
 import java.sql.ResultSet;
 
+/**
+ * Representation of a record in the pending segments table. <br/>
+ * Mapping of column in table to field:
+ * <li/>  id -> id (Unique identifier for pending segment)
+ * <li/>  sequence_name -> sequenceName (sequence name used for segment 
allocation)
+ * <li/>  sequence_prev_id -> sequencePrevId (previous segment id used for 
segment allocation)
+ * <li/>  upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root 
segment from which this was upgraded)
+ * <li/>  task_allocator_id -> taskAllocatorId (Associates a task / task group 
/ replica group with the pending segment)

Review Comment:
   Does this render correctly as a javadoc?
   The `li` tags should ideally open and close like standard HTML tags: 
`<li>content</li>`
   
   The mapping of column name to field is not needed as it is pretty self 
explanatory.
   
   Also, the field descriptions should just be added in the respective getters 
and/or  the constructor rather than at the class level.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java:
##########
@@ -19,6 +19,9 @@
 
 package org.apache.druid.indexing.common.task;
 
+/**
+ * An interface to be implemented by every appending task that allocates 
pending segments.
+ */

Review Comment:
   ```suggestion
    * An append task that can allocate pending segments. All concrete {@link 
Task} implementations that need to allocate pending segments must implement 
this interface.
    */
   ```



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1473,183 +1427,6 @@ private void insertPendingSegmentIntoMetastore(
           .execute();
   }
 
-  /**
-   * Creates new IDs for the given append segments if a REPLACE task started 
and
-   * finished after these append segments had already been allocated. The newly
-   * created IDs belong to the same interval and version as the segments 
committed
-   * by the REPLACE task.
-   */
-  private Set<DataSegment> createNewIdsForAppendSegments(
-      Handle handle,
-      String dataSource,
-      Set<DataSegment> segmentsToAppend
-  )
-  {
-    if (segmentsToAppend.isEmpty()) {
-      return Collections.emptySet();
-    }
-
-    final Set<Interval> appendIntervals = new HashSet<>();
-    final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new 
TreeMap<>();
-    for (DataSegment segment : segmentsToAppend) {
-      appendIntervals.add(segment.getInterval());
-      appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new 
HashSet<>())
-                             .add(segment);
-    }
-
-    // Fetch all used segments that overlap with any of the append intervals
-    final Collection<DataSegment> overlappingSegments = 
retrieveUsedSegmentsForIntervals(
-        dataSource,
-        new ArrayList<>(appendIntervals),
-        Segments.INCLUDING_OVERSHADOWED
-    );
-
-    final Map<String, Set<Interval>> overlappingVersionToIntervals = new 
HashMap<>();
-    final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new 
HashMap<>();
-    for (DataSegment segment : overlappingSegments) {
-      overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> 
new HashSet<>())
-                                   .add(segment.getInterval());
-      overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i 
-> new HashSet<>())
-                                   .add(segment);
-    }
-
-    final Set<DataSegment> upgradedSegments = new HashSet<>();
-    for (Map.Entry<String, Set<Interval>> entry : 
overlappingVersionToIntervals.entrySet()) {
-      final String upgradeVersion = entry.getKey();
-      Map<Interval, Set<DataSegment>> segmentsToUpgrade = 
getSegmentsWithVersionLowerThan(
-          upgradeVersion,
-          entry.getValue(),
-          appendVersionToSegments
-      );
-      for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : 
segmentsToUpgrade.entrySet()) {
-        final Interval upgradeInterval = upgradeEntry.getKey();
-        final Set<DataSegment> segmentsAlreadyOnVersion
-            = overlappingIntervalToSegments.getOrDefault(upgradeInterval, 
Collections.emptySet())
-                                           .stream()
-                                           .filter(s -> 
s.getVersion().equals(upgradeVersion))
-                                           .collect(Collectors.toSet());
-        Set<DataSegment> segmentsUpgradedToVersion = 
createNewIdsForAppendSegmentsWithVersion(
-            handle,
-            upgradeVersion,
-            upgradeInterval,
-            upgradeEntry.getValue(),
-            segmentsAlreadyOnVersion
-        );
-        log.info("Upgraded [%d] segments to version[%s].", 
segmentsUpgradedToVersion.size(), upgradeVersion);
-        upgradedSegments.addAll(segmentsUpgradedToVersion);
-      }
-    }
-
-    return upgradedSegments;
-  }
-
-  /**
-   * Creates a Map from eligible interval to Set of segments that are fully
-   * contained in that interval and have a version strictly lower than {@code 
#cutoffVersion}.
-   */
-  private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(

Review Comment:
   Cool cleanup! I assume none of these methods are needed anymore?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java:
##########
@@ -210,6 +211,13 @@ public SegmentIdWithShardSpec perform(
       final TaskActionToolbox toolbox
   )
   {
+    if (!(task instanceof PendingSegmentAllocatingTask)) {
+      throw new IAE(
+          "Task[%s] of type[%s] cannot allocate segments as it does not 
implement PendingSegmentAllocatingTask.",
+          task.getId(),
+          task.getType()

Review Comment:
   ```suggestion
             task.getId(), task.getType()
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java:
##########
@@ -210,6 +211,13 @@ public SegmentIdWithShardSpec perform(
       final TaskActionToolbox toolbox
   )
   {
+    if (!(task instanceof PendingSegmentAllocatingTask)) {
+      throw new IAE(

Review Comment:
   Use a `DruidException` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -37,14 +37,28 @@
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * Replace segments in metadata storage. The segment versions must all be less 
than or equal to a lock held by
  * your task for the segment intervals.
+ *
+ * <pre>
+ *  Pseudo code (for a single interval)

Review Comment:
   Same comment regarding pseudo-code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -44,6 +45,17 @@
 /**
  * Append segments to metadata storage. The segment versions must all be less 
than or equal to a lock held by
  * your task for the segment intervals.
+ *
+ * <pre>
+ * Pseudo code (for a single interval):
+ * For an append lock held over an interval:
+ *     transaction {
+ *       commit input segments contained within interval
+ *       if there is an active replace lock over the interval:
+ *         add an entry for the inputSegment corresponding to the replace 
lock's task in the upgradeSegments table
+ *       fetch pending segments with parent contained within the input 
segments, and commit them
+ *     }

Review Comment:
   It doesn't seem appropriate to have the implementation described as 
pseudo-code. Someone might as well read the code. It is better to briefly 
describe the key points of the implementation in a list fashion. (This is not a 
blocker for the PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to