This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new cdc192d38d6 Prevent multiple attempts to publish segments for the same
sequence (#14995)
cdc192d38d6 is described below
commit cdc192d38d625c6042c445ad9588dcb4723c8843
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Nov 16 14:21:26 2023 +0530
Prevent multiple attempts to publish segments for the same sequence (#14995)
* Prevent a race that may cause multiple attempts to publish segments for
the same sequence
---
.../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 769413d6ffc..9fc743bd23b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -213,6 +213,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final String stream;
private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
+ private final Set<String> publishedSequences = Sets.newConcurrentHashSet();
private final List<ListenableFuture<SegmentsAndCommitMetadata>>
publishWaitList = new ArrayList<>();
private final List<ListenableFuture<SegmentsAndCommitMetadata>>
handOffWaitList = new ArrayList<>();
@@ -806,7 +807,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequencesSnapshot = new ArrayList<>(sequences);
for (int i = 0; i < sequencesSnapshot.size(); i++) {
final SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata = sequencesSnapshot.get(i);
- if (!publishingSequences.contains(sequenceMetadata.getSequenceName()))
{
+ if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
+ &&
!publishedSequences.contains(sequenceMetadata.getSequenceName())) {
final boolean isLast = i == (sequencesSnapshot.size() - 1);
if (isLast) {
// Shorten endOffsets of the last sequence to match currOffsets.
@@ -1009,6 +1011,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
);
log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(),
"Published segments");
+ publishedSequences.add(sequenceMetadata.getSequenceName());
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());
@@ -1157,7 +1160,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
for (SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata : sequences) {
sequenceMetadata.updateAssignments(currOffsets,
this::isMoreToReadBeforeReadingRecord);
- if (!sequenceMetadata.isOpen() &&
!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
+ if (!sequenceMetadata.isOpen()
+ && !publishingSequences.contains(sequenceMetadata.getSequenceName())
+ && !publishedSequences.contains(sequenceMetadata.getSequenceName()))
{
publishingSequences.add(sequenceMetadata.getSequenceName());
try {
final Object result = driver.persist(committerSupplier.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]