AmatyaAvadhanula commented on code in PR #15141:
URL: https://github.com/apache/druid/pull/15141#discussion_r1445621990


##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -675,12 +676,19 @@ ListenableFuture<SegmentsAndCommitMetadata> 
publishInBackground(
                 segmentsAndCommitMetadata.getSegments(),
                 "Failed publish, not removing segments"
             );
-            Throwables.propagateIfPossible(e);
-            throw new RuntimeException(e);
+            if (e.getMessage() != null && e.getMessage().contains("Failed to 
update the metadata Store. The new start metadata is ahead of last commited end 
state.")) {
+              // we can recover from this error.
+              throw new ExecutionException(e);

Review Comment:
   Why do we need to throw an ExecutionException instead of checking the 
message itself below?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -652,19 +656,16 @@ ListenableFuture<SegmentsAndCommitMetadata> 
publishInBackground(
                   
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
                 }
               } else {
+                log.errorSegments(ourSegments, "Failed to publish segments");
+                if (publishResult.getErrorMsg() != null && 
publishResult.getErrorMsg().contains(("Failed to update the metadata Store. The 
new start metadata is ahead of last commited end state."))) {
+                  throw new ISE(publishResult.getErrorMsg());
+                }
                 // Our segments aren't active. Publish failed for some reason. 
Clean them up and then throw an error.
                 
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

Review Comment:
   Segments should be cleaned up if the retries are exhausted or an 
unrecoverable exception is thrown.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java:
##########
@@ -147,6 +148,38 @@ public SeekableStreamSequenceNumbers<PartitionIdType, 
SequenceOffsetType> plus(
     }
   }
 
+  @Override
+  public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, 
SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
+  {
+    if (this.getClass() != other.getClass()) {
+      throw new IAE(
+          "Expected instance of %s, got %s",
+          this.getClass().getName(),
+          other.getClass().getName()
+      );
+    }
+
+    final SeekableStreamEndSequenceNumbers<PartitionIdType, 
SequenceOffsetType> otherStart =
+        (SeekableStreamEndSequenceNumbers<PartitionIdType, 
SequenceOffsetType>) other;
+
+    if (stream.equals(otherStart.stream)) {
+      //Same stream, compare the offset
+      boolean res = false;
+      for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
partitionSequenceNumberMap.entrySet()) {
+        PartitionIdType partitionId = entry.getKey();
+        SequenceOffsetType sequenceOffset = entry.getValue();
+        if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && 
comparator.compare(sequenceOffset, 
otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) {

Review Comment:
   If otherStart for partitionId is null and sequenceOffset is not null, should 
the comparator return 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to