jihoonson closed pull request #5973: [Backport] Fix 
ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner
URL: https://github.com/apache/incubator-druid/pull/5973
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d7bffedea17..62aca366e48 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -44,7 +44,6 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.impl.InputRowParser;
@@ -72,6 +71,7 @@
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.parsers.ParseException;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.DruidMetrics;
 import io.druid.query.NoopQueryRunner;
 import io.druid.query.Query;
@@ -85,9 +85,9 @@
 import io.druid.segment.realtime.appenderator.Appenderator;
 import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import io.druid.segment.realtime.appenderator.Appenderators;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
 import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import io.druid.segment.realtime.firehose.ChatHandler;
 import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -2059,13 +2059,19 @@ private boolean withinMinMaxRecordTime(final InputRow 
row)
 
   private static class SequenceMetadata
   {
+    /**
+     * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This 
lock is required because
+     * {@link #setEndOffsets)} can be called by both the main thread and the 
HTTP thread.
+     */
+    private final ReentrantLock lock = new ReentrantLock();
+
     private final int sequenceId;
     private final String sequenceName;
     private final Map<Integer, Long> startOffsets;
     private final Map<Integer, Long> endOffsets;
     private final Set<Integer> assignments;
     private final boolean sentinel;
-    private volatile boolean checkpointed;
+    private boolean checkpointed;
 
     @JsonCreator
     public SequenceMetadata(
@@ -2082,8 +2088,8 @@ public SequenceMetadata(
       this.sequenceId = sequenceId;
       this.sequenceName = sequenceName;
       this.startOffsets = ImmutableMap.copyOf(startOffsets);
-      this.endOffsets = Maps.newHashMap(endOffsets);
-      this.assignments = Sets.newHashSet(startOffsets.keySet());
+      this.endOffsets = new HashMap<>(endOffsets);
+      this.assignments = new HashSet<>(startOffsets.keySet());
       this.checkpointed = checkpointed;
       this.sentinel = false;
     }
@@ -2097,7 +2103,13 @@ public int getSequenceId()
     @JsonProperty
     public boolean isCheckpointed()
     {
-      return checkpointed;
+      lock.lock();
+      try {
+        return checkpointed;
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     @JsonProperty
@@ -2115,7 +2127,13 @@ public String getSequenceName()
     @JsonProperty
     public Map<Integer, Long> getEndOffsets()
     {
-      return endOffsets;
+      lock.lock();
+      try {
+        return endOffsets;
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     @JsonProperty
@@ -2126,19 +2144,31 @@ public boolean isSentinel()
 
     public void setEndOffsets(Map<Integer, Long> newEndOffsets)
     {
-      endOffsets.putAll(newEndOffsets);
-      checkpointed = true;
+      lock.lock();
+      try {
+        endOffsets.putAll(newEndOffsets);
+        checkpointed = true;
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
     {
-      assignments.clear();
-      nextPartitionOffset.entrySet().forEach(partitionOffset -> {
-        if (Longs.compare(endOffsets.get(partitionOffset.getKey()), 
nextPartitionOffset.get(partitionOffset.getKey()))
-            > 0) {
-          assignments.add(partitionOffset.getKey());
-        }
-      });
+      lock.lock();
+      try {
+        assignments.clear();
+        nextPartitionOffset.entrySet().forEach(partitionOffset -> {
+          if (Longs.compare(endOffsets.get(partitionOffset.getKey()), 
nextPartitionOffset.get(partitionOffset.getKey()))
+              > 0) {
+            assignments.add(partitionOffset.getKey());
+          }
+        });
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     public boolean isOpen()
@@ -2148,10 +2178,16 @@ public boolean isOpen()
 
     boolean canHandle(ConsumerRecord<byte[], byte[]> record)
     {
-      return isOpen()
-             && endOffsets.get(record.partition()) != null
-             && record.offset() >= startOffsets.get(record.partition())
-             && record.offset() < endOffsets.get(record.partition());
+      lock.lock();
+      try {
+        return isOpen()
+               && endOffsets.get(record.partition()) != null
+               && record.offset() >= startOffsets.get(record.partition())
+               && record.offset() < endOffsets.get(record.partition());
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     private SequenceMetadata()
@@ -2173,15 +2209,21 @@ public static SequenceMetadata 
getSentinelSequenceMetadata()
     @Override
     public String toString()
     {
-      return "SequenceMetadata{" +
-             "sequenceName='" + sequenceName + '\'' +
-             ", sequenceId=" + sequenceId +
-             ", startOffsets=" + startOffsets +
-             ", endOffsets=" + endOffsets +
-             ", assignments=" + assignments +
-             ", sentinel=" + sentinel +
-             ", checkpointed=" + checkpointed +
-             '}';
+      lock.lock();
+      try {
+        return "SequenceMetadata{" +
+               "sequenceName='" + sequenceName + '\'' +
+               ", sequenceId=" + sequenceId +
+               ", startOffsets=" + startOffsets +
+               ", endOffsets=" + endOffsets +
+               ", assignments=" + assignments +
+               ", sentinel=" + sentinel +
+               ", checkpointed=" + checkpointed +
+               '}';
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
 
@@ -2194,28 +2236,40 @@ public String toString()
             @Override
             public Object getMetadata()
             {
-              Preconditions.checkState(
-                  assignments.isEmpty(),
-                  "This committer can be used only once all the records till 
offsets [%s] have been consumed, also make sure to call updateAssignments 
before using this committer",
-                  endOffsets
-              );
+              lock.lock();
 
-              // merge endOffsets for this sequence with globally 
lastPersistedOffsets
-              // This is done because this committer would be persisting only 
sub set of segments
-              // corresponding to the current sequence. Generally, 
lastPersistedOffsets should already
-              // cover endOffsets but just to be sure take max of offsets and 
persist that
-              for (Map.Entry<Integer, Long> partitionOffset : 
endOffsets.entrySet()) {
-                lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
-                    partitionOffset.getValue(),
-                    
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
-                ));
-              }
+              try {
+                Preconditions.checkState(
+                    assignments.isEmpty(),
+                    "This committer can be used only once all the records till 
offsets [%s] have been consumed, also make"
+                    + " sure to call updateAssignments before using this 
committer",
+                    endOffsets
+                );
 
-              // Publish metadata can be different from persist metadata as we 
are going to publish only
-              // subset of segments
-              return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new 
KafkaPartitions(topic, lastPersistedOffsets),
-                                     METADATA_PUBLISH_PARTITIONS, new 
KafkaPartitions(topic, endOffsets)
-              );
+                // merge endOffsets for this sequence with globally 
lastPersistedOffsets
+                // This is done because this committer would be persisting 
only sub set of segments
+                // corresponding to the current sequence. Generally, 
lastPersistedOffsets should already
+                // cover endOffsets but just to be sure take max of offsets 
and persist that
+                for (Map.Entry<Integer, Long> partitionOffset : 
endOffsets.entrySet()) {
+                  lastPersistedOffsets.put(
+                      partitionOffset.getKey(),
+                      Math.max(
+                          partitionOffset.getValue(),
+                          
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
+                      )
+                  );
+                }
+
+                // Publish metadata can be different from persist metadata as 
we are going to publish only
+                // subset of segments
+                return ImmutableMap.of(
+                    METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, 
lastPersistedOffsets),
+                    METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, 
endOffsets)
+                );
+              }
+              finally {
+                lock.unlock();
+              }
             }
 
             @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org

Reply via email to