This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new cdc192d38d6 Prevent multiple attempts to publish segments for the same 
sequence (#14995)
cdc192d38d6 is described below

commit cdc192d38d625c6042c445ad9588dcb4723c8843
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Nov 16 14:21:26 2023 +0530

    Prevent multiple attempts to publish segments for the same sequence (#14995)
    
    * Prevent a race that may cause multiple attempts to publish segments for 
the same sequence
---
 .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 769413d6ffc..9fc743bd23b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -213,6 +213,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private final String stream;
 
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
+  private final Set<String> publishedSequences = Sets.newConcurrentHashSet();
   private final List<ListenableFuture<SegmentsAndCommitMetadata>> 
publishWaitList = new ArrayList<>();
   private final List<ListenableFuture<SegmentsAndCommitMetadata>> 
handOffWaitList = new ArrayList<>();
 
@@ -806,7 +807,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequencesSnapshot = new ArrayList<>(sequences);
       for (int i = 0; i < sequencesSnapshot.size(); i++) {
         final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata = sequencesSnapshot.get(i);
-        if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) 
{
+        if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
+            && 
!publishedSequences.contains(sequenceMetadata.getSequenceName())) {
           final boolean isLast = i == (sequencesSnapshot.size() - 1);
           if (isLast) {
             // Shorten endOffsets of the last sequence to match currOffsets.
@@ -1009,6 +1011,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             );
             log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(), 
"Published segments");
 
+            publishedSequences.add(sequenceMetadata.getSequenceName());
             sequences.remove(sequenceMetadata);
             publishingSequences.remove(sequenceMetadata.getSequenceName());
 
@@ -1157,7 +1160,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : sequences) {
       sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadBeforeReadingRecord);
-      if (!sequenceMetadata.isOpen() && 
!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
+      if (!sequenceMetadata.isOpen()
+          && !publishingSequences.contains(sequenceMetadata.getSequenceName())
+          && !publishedSequences.contains(sequenceMetadata.getSequenceName())) 
{
         publishingSequences.add(sequenceMetadata.getSequenceName());
         try {
           final Object result = driver.persist(committerSupplier.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to