jihoonson closed pull request #5973: [Backport] Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner URL: https://github.com/apache/incubator-druid/pull/5973
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d7bffedea17..62aca366e48 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -44,7 +44,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; @@ -72,6 +71,7 @@ import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.parsers.ParseException; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.DruidMetrics; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; @@ -85,9 +85,9 @@ import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; -import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -2059,13 +2059,19 @@ private boolean withinMinMaxRecordTime(final InputRow row) private static class SequenceMetadata { + /** + * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because + * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. + */ + private final ReentrantLock lock = new ReentrantLock(); + private final int sequenceId; private final String sequenceName; private final Map<Integer, Long> startOffsets; private final Map<Integer, Long> endOffsets; private final Set<Integer> assignments; private final boolean sentinel; - private volatile boolean checkpointed; + private boolean checkpointed; @JsonCreator public SequenceMetadata( @@ -2082,8 +2088,8 @@ public SequenceMetadata( this.sequenceId = sequenceId; this.sequenceName = sequenceName; this.startOffsets = ImmutableMap.copyOf(startOffsets); - this.endOffsets = Maps.newHashMap(endOffsets); - this.assignments = Sets.newHashSet(startOffsets.keySet()); + this.endOffsets = new HashMap<>(endOffsets); + this.assignments = new HashSet<>(startOffsets.keySet()); this.checkpointed = checkpointed; this.sentinel = false; } @@ -2097,7 +2103,13 @@ public int getSequenceId() @JsonProperty public boolean isCheckpointed() { - return checkpointed; + lock.lock(); + try { + return checkpointed; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -2115,7 +2127,13 @@ public String getSequenceName() @JsonProperty public Map<Integer, Long> getEndOffsets() { - return endOffsets; + lock.lock(); + try { + return endOffsets; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -2126,19 +2144,31 @@ public boolean isSentinel() public void setEndOffsets(Map<Integer, Long> newEndOffsets) { - endOffsets.putAll(newEndOffsets); - checkpointed = true; + lock.lock(); + try { + endOffsets.putAll(newEndOffsets); + checkpointed = true; + } + finally { + lock.unlock(); + } } public void updateAssignments(Map<Integer, Long> nextPartitionOffset) { - assignments.clear(); - nextPartitionOffset.entrySet().forEach(partitionOffset -> { - if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) - > 0) { - assignments.add(partitionOffset.getKey()); - } - }); + lock.lock(); + try { + assignments.clear(); + nextPartitionOffset.entrySet().forEach(partitionOffset -> { + if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) + > 0) { + assignments.add(partitionOffset.getKey()); + } + }); + } + finally { + lock.unlock(); + } } public boolean isOpen() @@ -2148,10 +2178,16 @@ public boolean isOpen() boolean canHandle(ConsumerRecord<byte[], byte[]> record) { - return isOpen() - && endOffsets.get(record.partition()) != null - && record.offset() >= startOffsets.get(record.partition()) - && record.offset() < endOffsets.get(record.partition()); + lock.lock(); + try { + return isOpen() + && endOffsets.get(record.partition()) != null + && record.offset() >= startOffsets.get(record.partition()) + && record.offset() < endOffsets.get(record.partition()); + } + finally { + lock.unlock(); + } } private SequenceMetadata() @@ -2173,15 +2209,21 @@ public static SequenceMetadata getSentinelSequenceMetadata() @Override public String toString() { - return "SequenceMetadata{" + - "sequenceName='" + sequenceName + '\'' + - ", sequenceId=" + sequenceId + - ", startOffsets=" + startOffsets + - ", endOffsets=" + endOffsets + - ", assignments=" + assignments + - ", sentinel=" + sentinel + - ", checkpointed=" + checkpointed + - '}'; + lock.lock(); + try { + return "SequenceMetadata{" + + "sequenceName='" + sequenceName + '\'' + + ", sequenceId=" + sequenceId + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", assignments=" + assignments + + ", sentinel=" + sentinel + + ", checkpointed=" + checkpointed + + '}'; + } + finally { + lock.unlock(); + } } @@ -2194,28 +2236,40 @@ public String toString() @Override public Object getMetadata() { - Preconditions.checkState( - assignments.isEmpty(), - "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", - endOffsets - ); + lock.lock(); - // merge endOffsets for this sequence with globally lastPersistedOffsets - // This is done because this committer would be persisting only sub set of segments - // corresponding to the current sequence. Generally, lastPersistedOffsets should already - // cover endOffsets but just to be sure take max of offsets and persist that - for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) { - lastPersistedOffsets.put(partitionOffset.getKey(), Math.max( - partitionOffset.getValue(), - lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) - )); - } + try { + Preconditions.checkState( + assignments.isEmpty(), + "This committer can be used only once all the records till offsets [%s] have been consumed, also make" + + " sure to call updateAssignments before using this committer", + endOffsets + ); - // Publish metadata can be different from persist metadata as we are going to publish only - // subset of segments - return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), - METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) - ); + // merge endOffsets for this sequence with globally lastPersistedOffsets + // This is done because this committer would be persisting only sub set of segments + // corresponding to the current sequence. Generally, lastPersistedOffsets should already + // cover endOffsets but just to be sure take max of offsets and persist that + for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) { + lastPersistedOffsets.put( + partitionOffset.getKey(), + Math.max( + partitionOffset.getValue(), + lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) + ) + ); + } + + // Publish metadata can be different from persist metadata as we are going to publish only + // subset of segments + return ImmutableMap.of( + METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), + METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) + ); + } + finally { + lock.unlock(); + } } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org