kfaraz commented on code in PR #16162:
URL: https://github.com/apache/druid/pull/16162#discussion_r1578807492


##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java:
##########
@@ -377,7 +388,8 @@ public void onSuccess(Object result)
                             new SegmentsAndCommitMetadata(
                                 segments,
                                 ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata(),
-                                
segmentsAndCommitMetadata.getSegmentSchemaMapping()
+                                
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
+                                upgradedSegments

Review Comment:
   ```suggestion
                                   
segmentsAndCommitMetadata.getUpgradedSegments()
   ```



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1334,6 +1372,18 @@ private SegmentPublishResult 
commitAppendSegmentsAndMetadataInTransaction(
             }
 
             insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+
+            // Delete the pending segments to be committed in this transaction 
in batches of at most 100
+            final List<List<String>> pendingSegmentIdBatches = Lists.partition(
+                allSegmentsToInsert.stream()
+                                   .map(pendingSegment -> 
pendingSegment.getId().toString())
+                                   .collect(Collectors.toList()),
+                100
+            );
+            for (List<String> pendingSegmentIdBatch : pendingSegmentIdBatches) 
{
+              deletePendingSegmentsById(handle, dataSource, 
pendingSegmentIdBatch);

Review Comment:
   Maybe add an info log for total count of pending segment records deleted.



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java:
##########
@@ -221,10 +221,8 @@ public void testFailDuringDrop() throws IOException, 
InterruptedException, Timeo
 
     Assert.assertNull(driver.startJob(null));
 
-    for (int i = 0; i < ROWS.size(); i++) {
-      committerSupplier.setMetadata(i + 1);
-      Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, 
false, true).isOk());
-    }
+    committerSupplier.setMetadata(1);

Review Comment:
   Can't we control the order in the test?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java:
##########
@@ -20,28 +20,50 @@
 package org.apache.druid.segment.realtime.appenderator;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 public class SegmentsAndCommitMetadata
 {
   private static final SegmentsAndCommitMetadata NIL = new 
SegmentsAndCommitMetadata(Collections.emptyList(), null);
 
   private final Object commitMetadata;
   private final ImmutableList<DataSegment> segments;
+  private final ImmutableSet<DataSegment> upgradedSegments;

Review Comment:
   Better to have this as a javadoc on `getUpgradedSegments()`.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1368,6 +1399,10 @@ private Object bootstrapSinksFromDisk()
         );
         rowsSoFar += currSink.getNumRows();
         sinks.put(identifier, currSink);
+        idToPendingSegment.put(identifier.asSegmentId().toString(), 
identifier);

Review Comment:
   Need a comment here.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -166,8 +168,13 @@ public class StreamAppenderator implements Appenderator
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
-      baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+  private final ConcurrentMap<SegmentIdWithShardSpec, 
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedSegments
+      = new ConcurrentHashMap<>();
+  private final ConcurrentMap<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedSegmentToBaseSegment
+      = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap.KeySetView<SegmentIdWithShardSpec, Boolean> 
abandonedSegments

Review Comment:
   This needs a javadoc.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1098,16 +1104,16 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer()
   /**
    * Unannounces the given base segment and all its upgraded versions.
    */
-  private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws 
IOException
+  private void unannounceAllVersionsOfSegment(DataSegment baseSegment)
   {
-    segmentAnnouncer.unannounceSegment(baseSegment);
-
-    final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
-        = baseSegmentToUpgradedVersions.remove(baseSegment.getId());
-    if (upgradedVersionsOfSegment == null || 
upgradedVersionsOfSegment.isEmpty()) {
+    final SegmentIdWithShardSpec baseId = 
SegmentIdWithShardSpec.fromDataSegment(baseSegment);
+    if (!baseSegmentToUpgradedSegments.containsKey(baseId)) {
       return;
     }
 
+    final List<SegmentIdWithShardSpec> upgradedVersionsOfSegment
+        = ImmutableList.copyOf(baseSegmentToUpgradedSegments.get(baseId));

Review Comment:
   Why do we need to make a copy? Why can't we use the original set itself?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -528,6 +535,10 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec 
identifier)
       }
 
       sinks.put(identifier, retVal);
+      idToPendingSegment.put(identifier.asSegmentId().toString(), identifier);
+      baseSegmentToUpgradedSegments.put(identifier, new HashSet<>());
+      baseSegmentToUpgradedSegments.get(identifier).add(identifier);

Review Comment:
   Do we want to clear the set even if this base segment was already mapped to 
some other segments?
   Does the set of upgraded segments need to contain self too?
   
   ```suggestion
         baseSegmentToUpgradedSegments.computeIfAbsent(identifier, i -> new 
HashSet<>()).add(identifier);
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java:
##########
@@ -366,7 +373,11 @@ public void onSuccess(Object result)
                     {
                       if (numRemainingHandoffSegments.decrementAndGet() == 0) {
                         List<DataSegment> segments = 
segmentsAndCommitMetadata.getSegments();
+                        Set<DataSegment> upgradedSegments = 
segmentsAndCommitMetadata.getUpgradedSegments();
                         log.info("Successfully handed off [%d] segments.", 
segments.size());
+                        if (upgradedSegments != null) {
+                          log.info("Successfully handed off [%d] upgraded 
segments.", upgradedSegments.size());
+                        }

Review Comment:
   Maybe just use `segmentsToBeHandedOff.size()` and remove the unnecessary 
references to `segmentsAndCommitMetadata`.
   
   ```suggestion
                           log.info("Successfully handed off [%d] segments.", 
segmentsToBeHandedOff.size());
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1098,16 +1104,16 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer()
   /**
    * Unannounces the given base segment and all its upgraded versions.
    */
-  private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws 
IOException
+  private void unannounceAllVersionsOfSegment(DataSegment baseSegment)
   {
-    segmentAnnouncer.unannounceSegment(baseSegment);
-
-    final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
-        = baseSegmentToUpgradedVersions.remove(baseSegment.getId());

Review Comment:
   Why don't we need to`remove` from map anymore?



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java:
##########
@@ -201,7 +201,7 @@ public void testFailDuringDrop() throws IOException, 
InterruptedException, Timeo
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
     expectedException.expectMessage(
-        "Fail test while dropping 
segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"

Review Comment:
   Why don't we have the segment ID in the message anymore?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1402,6 +1437,17 @@ private ListenableFuture<?> abandonSegment(
       final boolean removeOnDiskData
   )
   {
+    abandonedSegments.add(identifier);
+    final SegmentIdWithShardSpec baseIdentifier = 
upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier);
+    synchronized (sink) {
+      if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) {

Review Comment:
   Need a comment here.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1138,21 +1164,26 @@ public void registerNewVersionOfPendingSegment(
 
     // Announce segments
     final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
+    final DataSegment newSegment = getUpgradedSegment(baseSegment, 
newSegmentVersion);
 
-    final DataSegment newSegment = new DataSegment(
-        newSegmentVersion.getDataSource(),
-        newSegmentVersion.getInterval(),
-        newSegmentVersion.getVersion(),
+    segmentAnnouncer.announceSegment(newSegment);
+    
baseSegmentToUpgradedSegments.get(basePendingSegment).add(newSegmentVersion);
+    upgradedSegmentToBaseSegment.put(newSegmentVersion, basePendingSegment);

Review Comment:
   These two operations can be put inside a method. Also, I guess it is better 
to use `computeIfAbsent` for `baseSegmentToUpgradedSegments`.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -528,6 +535,10 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec 
identifier)
       }
 
       sinks.put(identifier, retVal);
+      idToPendingSegment.put(identifier.asSegmentId().toString(), identifier);

Review Comment:
   Need some comments here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to