KAFKA-4923: Add Exactly-Once Semantics to Streams Author: Matthias J. Sax <[email protected]>
Reviewers: Apurva Metha, Ismael Juma, Damian Guy, Eno Thereska, Guozhang Wang Closes #2945 from mjsax/kafka-4923-add-eos-to-streams Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ebc7f7ca Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ebc7f7ca Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ebc7f7ca Branch: refs/heads/trunk Commit: ebc7f7caaeb47c9588d79a2f3ed496daa0bd39e5 Parents: 670193f Author: Matthias J. Sax <[email protected]> Authored: Tue May 16 17:23:11 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 16 17:23:11 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/MockConsumer.java | 78 +-- .../kafka/clients/producer/MockProducer.java | 23 + .../kafka/common/config/AbstractConfig.java | 16 + .../clients/producer/MockProducerTest.java | 122 ++++ .../org/apache/kafka/streams/StreamsConfig.java | 74 ++- .../processor/internals/AbstractTask.java | 34 +- .../processor/internals/PartitionGroup.java | 65 +- .../internals/ProcessorStateManager.java | 17 +- .../processor/internals/RecordCollector.java | 12 + .../internals/RecordCollectorImpl.java | 9 +- .../processor/internals/StandbyContextImpl.java | 22 +- .../processor/internals/StandbyTask.java | 15 +- .../streams/processor/internals/StreamTask.java | 158 +++-- .../processor/internals/StreamThread.java | 180 ++++-- .../state/internals/OffsetCheckpoint.java | 75 +-- .../apache/kafka/streams/StreamsConfigTest.java | 171 ++++- .../processor/internals/AbstractTaskTest.java | 13 +- .../processor/internals/PartitionGroupTest.java | 12 +- .../internals/ProcessorStateManagerTest.java | 272 +++++--- .../processor/internals/StreamTaskTest.java | 289 ++++++--- .../processor/internals/StreamThreadTest.java | 645 +++++++++++++------ .../StreamThreadStateStoreProviderTest.java | 69 +- .../apache/kafka/test/MockClientSupplier.java | 23 +- .../apache/kafka/test/NoOpRecordCollector.java | 9 +- .../kafka/test/ProcessorTopologyTestDriver.java | 155 ++--- 25 files changed, 1812 insertions(+), 746 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index d81270a..91cb6f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -75,29 +75,29 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public Set<TopicPartition> assignment() { + public synchronized Set<TopicPartition> assignment() { return this.subscriptions.assignedPartitions(); } /** Simulate a rebalance event. */ - public void rebalance(Collection<TopicPartition> newAssignment) { + public synchronized void rebalance(Collection<TopicPartition> newAssignment) { // TODO: Rebalance callbacks this.records.clear(); this.subscriptions.assignFromSubscribed(newAssignment); } @Override - public Set<String> subscription() { + public synchronized Set<String> subscription() { return this.subscriptions.subscription(); } @Override - public void subscribe(Collection<String> topics) { + public synchronized void subscribe(Collection<String> topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); } @Override - public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) { + public synchronized void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) { ensureNotClosed(); this.subscriptions.subscribe(pattern, listener); Set<String> topicsToSubscribe = new HashSet<>(); @@ -111,25 +111,25 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) { + public synchronized void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) { ensureNotClosed(); this.subscriptions.subscribe(new HashSet<>(topics), listener); } @Override - public void assign(Collection<TopicPartition> partitions) { + public synchronized void assign(Collection<TopicPartition> partitions) { ensureNotClosed(); this.subscriptions.assignFromUser(new HashSet<>(partitions)); } @Override - public void unsubscribe() { + public synchronized void unsubscribe() { ensureNotClosed(); subscriptions.unsubscribe(); } @Override - public ConsumerRecords<K, V> poll(long timeout) { + public synchronized ConsumerRecords<K, V> poll(long timeout) { ensureNotClosed(); // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in @@ -176,7 +176,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { return new ConsumerRecords<>(results); } - public void addRecord(ConsumerRecord<K, V> record) { + public synchronized void addRecord(ConsumerRecord<K, V> record) { ensureNotClosed(); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); Set<TopicPartition> currentAssigned = new HashSet<>(this.subscriptions.assignedPartitions()); @@ -190,12 +190,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> { recs.add(record); } - public void setException(KafkaException exception) { + public synchronized void setException(KafkaException exception) { this.exception = exception; } @Override - public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { + public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { ensureNotClosed(); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) subscriptions.committed(entry.getKey(), entry.getValue()); @@ -205,34 +205,34 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) { + public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) { commitAsync(offsets, null); } @Override - public void commitAsync() { + public synchronized void commitAsync() { commitAsync(null); } @Override - public void commitAsync(OffsetCommitCallback callback) { + public synchronized void commitAsync(OffsetCommitCallback callback) { ensureNotClosed(); commitAsync(this.subscriptions.allConsumed(), callback); } @Override - public void commitSync() { + public synchronized void commitSync() { commitSync(this.subscriptions.allConsumed()); } @Override - public void seek(TopicPartition partition, long offset) { + public synchronized void seek(TopicPartition partition, long offset) { ensureNotClosed(); subscriptions.seek(partition, offset); } @Override - public OffsetAndMetadata committed(TopicPartition partition) { + public synchronized OffsetAndMetadata committed(TopicPartition partition) { ensureNotClosed(); if (subscriptions.isAssigned(partition)) { return subscriptions.committed(partition); @@ -241,7 +241,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public long position(TopicPartition partition) { + public synchronized long position(TopicPartition partition) { ensureNotClosed(); if (!this.subscriptions.isAssigned(partition)) throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); @@ -254,52 +254,52 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public void seekToBeginning(Collection<TopicPartition> partitions) { + public synchronized void seekToBeginning(Collection<TopicPartition> partitions) { ensureNotClosed(); for (TopicPartition tp : partitions) subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); } - public void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) { + public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) { beginningOffsets.putAll(newOffsets); } @Override - public void seekToEnd(Collection<TopicPartition> partitions) { + public synchronized void seekToEnd(Collection<TopicPartition> partitions) { ensureNotClosed(); for (TopicPartition tp : partitions) subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); } - public void updateEndOffsets(Map<TopicPartition, Long> newOffsets) { + public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) { endOffsets.putAll(newOffsets); } @Override - public Map<MetricName, ? extends Metric> metrics() { + public synchronized Map<MetricName, ? extends Metric> metrics() { ensureNotClosed(); return Collections.emptyMap(); } @Override - public List<PartitionInfo> partitionsFor(String topic) { + public synchronized List<PartitionInfo> partitionsFor(String topic) { ensureNotClosed(); return this.partitions.get(topic); } @Override - public Map<String, List<PartitionInfo>> listTopics() { + public synchronized Map<String, List<PartitionInfo>> listTopics() { ensureNotClosed(); return partitions; } - public void updatePartitions(String topic, List<PartitionInfo> partitions) { + public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) { ensureNotClosed(); this.partitions.put(topic, partitions); } @Override - public void pause(Collection<TopicPartition> partitions) { + public synchronized void pause(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { subscriptions.pause(partition); paused.add(partition); @@ -307,7 +307,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public void resume(Collection<TopicPartition> partitions) { + public synchronized void resume(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { subscriptions.resume(partition); paused.remove(partition); @@ -315,12 +315,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { + public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { throw new UnsupportedOperationException("Not implemented yet."); } @Override - public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { Map<TopicPartition, Long> result = new HashMap<>(); for (TopicPartition tp : partitions) { Long beginningOffset = beginningOffsets.get(tp); @@ -332,7 +332,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { Map<TopicPartition, Long> result = new HashMap<>(); for (TopicPartition tp : partitions) { Long endOffset = endOffsets.get(tp); @@ -344,22 +344,22 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override - public void close() { + public synchronized void close() { close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS); } @Override - public void close(long timeout, TimeUnit unit) { + public synchronized void close(long timeout, TimeUnit unit) { ensureNotClosed(); this.closed = true; } - public boolean closed() { + public synchronized boolean closed() { return this.closed; } @Override - public void wakeup() { + public synchronized void wakeup() { wakeup.set(true); } @@ -368,13 +368,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> { * invocation. You can use this repeatedly to mock out multiple responses to poll invocations. * @param task the task to be executed */ - public void schedulePollTask(Runnable task) { + public synchronized void schedulePollTask(Runnable task) { synchronized (pollTasks) { pollTasks.add(task); } } - public void scheduleNopPollTask() { + public synchronized void scheduleNopPollTask() { schedulePollTask(new Runnable() { @Override public void run() { @@ -383,7 +383,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { }); } - public Set<TopicPartition> paused() { + public synchronized Set<TopicPartition> paused() { return Collections.unmodifiableSet(new HashSet<>(paused)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 15ea454..22fa755 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -37,6 +37,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -65,6 +66,8 @@ public class MockProducer<K, V> implements Producer<K, V> { private boolean transactionCommitted; private boolean transactionAborted; private boolean producerFenced; + private boolean sentOffsets; + private long commitCount = 0L; /** * Create a mock producer @@ -148,6 +151,7 @@ public class MockProducer<K, V> implements Producer<K, V> { this.transactionInFlight = true; this.transactionCommitted = false; this.transactionAborted = false; + this.sentOffsets = false; } @Override @@ -156,12 +160,17 @@ public class MockProducer<K, V> implements Producer<K, V> { verifyProducerState(); verifyTransactionsInitialized(); verifyNoTransactionInFlight(); + Objects.requireNonNull(consumerGroupId); + if (offsets.size() == 0) { + return; + } Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets = this.uncommittedConsumerGroupOffsets.get(consumerGroupId); if (uncommittedOffsets == null) { uncommittedOffsets = new HashMap<>(); this.uncommittedConsumerGroupOffsets.put(consumerGroupId, uncommittedOffsets); } uncommittedOffsets.putAll(offsets); + this.sentOffsets = true; } @Override @@ -182,6 +191,7 @@ public class MockProducer<K, V> implements Producer<K, V> { this.transactionAborted = false; this.transactionInFlight = false; + ++this.commitCount; } @Override @@ -276,6 +286,7 @@ public class MockProducer<K, V> implements Producer<K, V> { } public synchronized void flush() { + verifyProducerState(); while (!this.completions.isEmpty()) completeNext(); } @@ -329,6 +340,18 @@ public class MockProducer<K, V> implements Producer<K, V> { return this.transactionAborted; } + public boolean flushed() { + return this.completions.isEmpty(); + } + + public boolean sentOffsets() { + return this.sentOffsets; + } + + public long commitCount() { + return this.commitCount; + } + /** * Get the list of sent records since the last call to {@link #clear()} */ http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index aa8cf0d..dc7fd7c 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -60,6 +60,11 @@ public class AbstractConfig { throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); this.originals = (Map<String, ?>) originals; this.values = definition.parse(this.originals); + Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values)); + for (Map.Entry<String, Object> update : configUpdates.entrySet()) { + this.values.put(update.getKey(), update.getValue()); + } + definition.parse(this.values); this.used = Collections.synchronizedSet(new HashSet<String>()); this.definition = definition; if (doLog) @@ -70,6 +75,17 @@ public class AbstractConfig { this(definition, originals, true); } + /** + * Called directly after user configs got parsed (and thus default values got set). + * This allows to change default values for "secondary defaults" if required. + * + * @param parsedValues unmodifiable map of current configuration + * @return a map of updates that should be applied to the configuration (will be validated to prevent bad updates) + */ + protected Map<String, Object> postProcessParsedConfig(Map<String, Object> parsedValues) { + return Collections.emptyMap(); + } + protected Object get(String key) { if (!values.containsKey(key)) throw new ConfigException(String.format("Unknown configuration '%s'", key)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 468ea49..eeb9b5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -171,6 +171,28 @@ public class MockProducerTest { assertFalse(producer.transactionAborted()); } + @Test + public void shouldCountCommittedTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + + assertThat(producer.commitCount(), equalTo(0L)); + producer.commitTransaction(); + assertThat(producer.commitCount(), equalTo(1L)); + } + + @Test + public void shouldNotCountAbortedTransaction() { + producer.initTransactions(); + + producer.beginTransaction(); + producer.abortTransaction(); + + producer.beginTransaction(); + producer.commitTransaction(); + assertThat(producer.commitCount(), equalTo(1L)); + } + @Test(expected = IllegalStateException.class) public void shouldThrowOnAbortIfTransactionsNotInitialized() { producer.abortTransaction(); @@ -231,6 +253,16 @@ public class MockProducerTest { } @Test + public void shouldThrowOnFlushIfProducerGotFenced() { + producer.initTransactions(); + producer.fenceProducer(); + try { + producer.flush(); + fail("Should have thrown as producer is fenced off"); + } catch (ProducerFencedException e) { } + } + + @Test public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() { producer.initTransactions(); producer.fenceProducer(); @@ -377,6 +409,61 @@ public class MockProducerTest { } @Test + public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + + try { + producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), null); + fail("Should have thrown NullPointerException"); + } catch (NullPointerException e) { } + } + + @Test + public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), "groupId"); + assertFalse(producer.sentOffsets()); + } + + @Test + public void shouldAddOffsetsWhenSendOffsetsToTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + + assertFalse(producer.sentOffsets()); + + Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + } + }; + producer.sendOffsetsToTransaction(groupCommit, "groupId"); + assertTrue(producer.sentOffsets()); + } + + @Test + public void shouldResetSentOffsetsFlagOnlyWhenBeginningNewTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + + assertFalse(producer.sentOffsets()); + + Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + } + }; + producer.sendOffsetsToTransaction(groupCommit, "groupId"); + producer.commitTransaction(); // commit should not reset "sentOffsets" flag + assertTrue(producer.sentOffsets()); + + producer.beginTransaction(); + assertFalse(producer.sentOffsets()); + } + + @Test public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() { producer.initTransactions(); producer.beginTransaction(); @@ -528,6 +615,41 @@ public class MockProducerTest { } catch (IllegalStateException e) { } } + @Test + public void shouldThrowOnFlushProducerIfProducerIsClosed() { + producer.close(); + try { + producer.flush(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldBeFlushedIfNoBufferedRecords() { + assertTrue(producer.flushed()); + } + + @Test + public void shouldBeFlushedWithAutoCompleteIfBufferedRecords() { + producer.send(record1); + assertTrue(producer.flushed()); + } + + @Test + public void shouldNotBeFlushedWithNoAutoCompleteIfBufferedRecords() { + MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + producer.send(record1); + assertFalse(producer.flushed()); + } + + @Test + public void shouldNotBeFlushedAfterFlush() { + MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + producer.send(record1); + producer.flush(); + assertTrue(producer.flushed()); + } + private boolean isError(Future<?> future) { try { future.get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index c1fd2d6..af9b8e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -35,14 +35,18 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; +import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; /** * Configuration for a {@link KafkaStreams} instance. @@ -78,8 +82,14 @@ import static org.apache.kafka.common.config.ConfigDef.ValidString.in; */ public class StreamsConfig extends AbstractConfig { + private final static Logger log = LoggerFactory.getLogger(StreamsConfig.class); + private static final ConfigDef CONFIG; + private final boolean eosEnabled; + private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L; + private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; + /** * Prefix used to isolate {@link KafkaConsumer consumer} configs from {@link KafkaProducer producer} configs. * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer @@ -129,7 +139,9 @@ public class StreamsConfig extends AbstractConfig { /** {@code commit.interval.ms} */ public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; - private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; + private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor." + + " (Note, if 'processing.guarantee' is set to '" + EXACTLY_ONCE + "', the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," + + " otherwise the default value is " + DEFAULT_COMMIT_INTERVAL_MS + "."; /** {@code connections.max.idle.ms} */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; @@ -321,7 +333,7 @@ public class StreamsConfig extends AbstractConfig { DEFAULT_VALUE_SERDE_CLASS_DOC) .define(COMMIT_INTERVAL_MS_CONFIG, Type.LONG, - 30000, + DEFAULT_COMMIT_INTERVAL_MS, Importance.LOW, COMMIT_INTERVAL_MS_DOC) .define(POLL_MS_CONFIG, @@ -458,6 +470,16 @@ public class StreamsConfig extends AbstractConfig { PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } + private static final Map<String, Object> PRODUCER_EOS_OVERRIDES; + static { + final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); + tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + + PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); + } + private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; static { final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); @@ -475,6 +497,13 @@ public class StreamsConfig extends AbstractConfig { CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } + private static final Map<String, Object> CONSUMER_EOS_OVERRIDES; + static { + final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); + tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.name().toLowerCase(Locale.ROOT)); + CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + } + public static class InternalConfig { public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__"; } @@ -517,6 +546,21 @@ public class StreamsConfig extends AbstractConfig { */ public StreamsConfig(final Map<?, ?> props) { super(CONFIG, props); + eosEnabled = EXACTLY_ONCE.equals(getString(PROCESSING_GUARANTEE_CONFIG)); + } + + @Override + protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { + final Map<String, Object> configUpdates = new HashMap<>(); + + final boolean eosEnabled = EXACTLY_ONCE.equals(parsedValues.get(PROCESSING_GUARANTEE_CONFIG)); + if (eosEnabled && !originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) { + log.debug("Using " + COMMIT_INTERVAL_MS_CONFIG + " default value of " + + EOS_DEFAULT_COMMIT_INTERVAL_MS + " as exactly once is enabled."); + configUpdates.put(COMMIT_INTERVAL_MS_CONFIG, EOS_DEFAULT_COMMIT_INTERVAL_MS); + } + + return configUpdates; } private Map<String, Object> getCommonConsumerConfigs() throws ConfigException { @@ -528,8 +572,14 @@ public class StreamsConfig extends AbstractConfig { throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ", as the streams client will always turn off auto committing."); } + if (eosEnabled) { + if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) { + throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' consumers will always read committed data only."); + } + } - final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); + final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); consumerProps.putAll(clientProvidedProps); // bootstrap.servers should be from StreamsConfig @@ -604,9 +654,23 @@ public class StreamsConfig extends AbstractConfig { * @return Map of the producer configuration. */ public Map<String, Object> getProducerConfigs(final String clientId) { + final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); + + if (eosEnabled) { + if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { + throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG + + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have idempotency enabled."); + } + + if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { + throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have only one in-flight request per connection."); + } + } + // generate producer configs from original properties and overridden maps - final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); - props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames())); + final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES); + props.putAll(clientProvidedProps); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index d546118..d97f8f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -40,15 +41,16 @@ import java.util.Set; public abstract class AbstractTask { private static final Logger log = LoggerFactory.getLogger(AbstractTask.class); - private final TaskId id; - protected final String applicationId; - protected final ProcessorTopology topology; - protected final Consumer consumer; - protected final ProcessorStateManager stateMgr; - protected final Set<TopicPartition> partitions; + final TaskId id; + final String applicationId; + final ProcessorTopology topology; + final Consumer consumer; + final ProcessorStateManager stateMgr; + final Set<TopicPartition> partitions; InternalProcessorContext processorContext; - protected final ThreadCache cache; + private final ThreadCache cache; final String logPrefix; + final boolean eosEnabled; /** * @throws ProcessorStateException if the state manager cannot be created @@ -61,28 +63,38 @@ public abstract class AbstractTask { final ChangelogReader changelogReader, final boolean isStandby, final StateDirectory stateDirectory, - final ThreadCache cache) { + final ThreadCache cache, + final StreamsConfig config) { this.id = id; this.applicationId = applicationId; this.partitions = new HashSet<>(partitions); this.topology = topology; this.consumer = consumer; this.cache = cache; + eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id()); // create the processor state manager try { - stateMgr = new ProcessorStateManager(id, partitions, isStandby, stateDirectory, topology.storeToChangelogTopic(), changelogReader); + stateMgr = new ProcessorStateManager( + id, + partitions, + isStandby, + stateDirectory, + topology.storeToChangelogTopic(), + changelogReader, + eosEnabled); } catch (final IOException e) { throw new ProcessorStateException(String.format("%s Error while creating the state manager", logPrefix), e); } } public abstract void resume(); + public abstract void commit(); public abstract void suspend(); - public abstract void close(); + public abstract void close(final boolean clean); public final TaskId id() { return id; @@ -108,7 +120,6 @@ public abstract class AbstractTask { return cache; } - public StateStore getStore(final String name) { return stateMgr.getStore(name); } @@ -200,5 +211,4 @@ public abstract class AbstractTask { stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index bae6f22..1e875b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -36,7 +36,7 @@ public class PartitionGroup { private final PriorityQueue<RecordQueue> queuesByTime; public static class RecordInfo { - public RecordQueue queue; + RecordQueue queue; public ProcessorNode node() { return queue.source(); @@ -46,7 +46,7 @@ public class PartitionGroup { return queue.partition(); } - public RecordQueue queue() { + RecordQueue queue() { return queue; } } @@ -54,23 +54,27 @@ public class PartitionGroup { // since task is thread-safe, we do not need to synchronize on local variables private int totalBuffered; - public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues) { - this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() { + PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) { + queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() { @Override - public int compare(RecordQueue queue1, RecordQueue queue2) { - long time1 = queue1.timestamp(); - long time2 = queue2.timestamp(); - - if (time1 < time2) return -1; - if (time1 > time2) return 1; + public int compare(final RecordQueue queue1, final RecordQueue queue2) { + final long time1 = queue1.timestamp(); + final long time2 = queue2.timestamp(); + + if (time1 < time2) { + return -1; + } + if (time1 > time2) { + return 1; + } return 0; } }); this.partitionQueues = partitionQueues; - this.totalBuffered = 0; + totalBuffered = 0; } /** @@ -78,10 +82,10 @@ public class PartitionGroup { * * @return StampedRecord */ - public StampedRecord nextRecord(RecordInfo info) { + StampedRecord nextRecord(final RecordInfo info) { StampedRecord record = null; - RecordQueue queue = queuesByTime.poll(); + final RecordQueue queue = queuesByTime.poll(); if (queue != null) { // get the first record from this queue. record = queue.poll(); @@ -92,7 +96,9 @@ public class PartitionGroup { } info.queue = queue; - if (record != null) totalBuffered--; + if (record != null) { + --totalBuffered; + } return record; } @@ -104,11 +110,11 @@ public class PartitionGroup { * @param rawRecords the raw records * @return the queue size for the partition */ - public int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { - RecordQueue recordQueue = partitionQueues.get(partition); + int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { + final RecordQueue recordQueue = partitionQueues.get(partition); - int oldSize = recordQueue.size(); - int newSize = recordQueue.addRawRecords(rawRecords); + final int oldSize = recordQueue.size(); + final int newSize = recordQueue.addRawRecords(rawRecords); // add this record queue to be considered for processing in the future if it was empty before if (oldSize == 0 && newSize > 0) { @@ -132,9 +138,10 @@ public class PartitionGroup { // we should always return the smallest timestamp of all partitions // to avoid group partition time goes backward long timestamp = Long.MAX_VALUE; - for (RecordQueue queue : partitionQueues.values()) { - if (timestamp > queue.timestamp()) + for (final RecordQueue queue : partitionQueues.values()) { + if (timestamp > queue.timestamp()) { timestamp = queue.timestamp(); + } } return timestamp; } @@ -142,21 +149,17 @@ public class PartitionGroup { /** * @throws IllegalStateException if the record's partition does not belong to this partition group */ - public int numBuffered(TopicPartition partition) { - RecordQueue recordQueue = partitionQueues.get(partition); + int numBuffered(final TopicPartition partition) { + final RecordQueue recordQueue = partitionQueues.get(partition); - if (recordQueue == null) + if (recordQueue == null) { throw new IllegalStateException("Record's partition does not belong to this partition-group."); + } return recordQueue.size(); } - public int topQueueSize() { - RecordQueue recordQueue = queuesByTime.peek(); - return (recordQueue == null) ? 0 : recordQueue.size(); - } - - public int numBuffered() { + int numBuffered() { return totalBuffered; } @@ -167,8 +170,8 @@ public class PartitionGroup { public void clear() { queuesByTime.clear(); - for (RecordQueue queue : partitionQueues.values()) { + for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 1a0e34a..d1bdf95 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -58,11 +58,12 @@ public class ProcessorStateManager implements StateManager { private final Map<TopicPartition, Long> checkpointedOffsets; private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name private final Map<String, String> storeToChangelogTopic; + private final boolean eosEnabled; // TODO: this map does not work with customized grouper where multiple partitions // of the same topic can be assigned to the same topic. private final Map<String, TopicPartition> partitionForTopic; - private final OffsetCheckpoint checkpoint; + private OffsetCheckpoint checkpoint; /** * @throws LockException if the state directory cannot be locked because another thread holds the lock @@ -74,7 +75,8 @@ public class ProcessorStateManager implements StateManager { final boolean isStandby, final StateDirectory stateDirectory, final Map<String, String> storeToChangelogTopic, - final ChangelogReader changelogReader) throws LockException, IOException { + final ChangelogReader changelogReader, + final boolean eosEnabled) throws LockException, IOException { this.taskId = taskId; this.stateDirectory = stateDirectory; this.changelogReader = changelogReader; @@ -91,6 +93,7 @@ public class ProcessorStateManager implements StateManager { this.isStandby = isStandby; restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null; this.storeToChangelogTopic = storeToChangelogTopic; + this.eosEnabled = eosEnabled; if (!stateDirectory.lock(taskId, 5)) { throw new LockException(String.format("%s Failed to lock the state directory for task %s", @@ -110,6 +113,12 @@ public class ProcessorStateManager implements StateManager { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); checkpointedOffsets = new HashMap<>(checkpoint.read()); + if (eosEnabled) { + // delete the checkpoint file after finish loading its stored offsets + checkpoint.delete(); + checkpoint = null; + } + log.info("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId); } @@ -325,6 +334,9 @@ public class ProcessorStateManager implements StateManager { } // write the checkpoint file before closing, to indicate clean shutdown try { + if (checkpoint == null) { + checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + } checkpoint.write(checkpointedOffsets); } catch (final IOException e) { log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e); @@ -333,7 +345,6 @@ public class ProcessorStateManager implements StateManager { private int getPartition(final String topic) { final TopicPartition partition = partitionForTopic.get(topic); - return partition == null ? taskId.partition : partition.partition(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 4516f8c..b083869 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -40,10 +41,21 @@ public interface RecordCollector { final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner); + /** + * Flush the internal {@link Producer}. + */ void flush(); + /** + * Close the internal {@link Producer}. + */ void close(); + /** + * The last acked offsets from the internal {@link Producer}. + * + * @return the map from TopicPartition to offset + */ Map<TopicPartition, Long> offsets(); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 9d2ac03..e1a86b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -136,20 +136,13 @@ public class RecordCollectorImpl implements RecordCollector { checkForException(); } - /** - * Closes this RecordCollector - */ @Override public void close() { + log.debug("{} Closing producer", logPrefix); producer.close(); checkForException(); } - /** - * The last ack'd offset from the producer - * - * @return the map from TopicPartition to offset - */ @Override public Map<TopicPartition, Long> offsets() { return offsets; http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index d738a19..0791c67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; + import java.util.Collections; import java.util.Map; @@ -50,14 +51,10 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle final StreamPartitioner<? super K, ? super V> partitioner) {} @Override - public void flush() { - - } + public void flush() {} @Override - public void close() { - - } + public void close() {} @Override public Map<TopicPartition, Long> offsets() { @@ -65,11 +62,11 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle } }; - public StandbyContextImpl(final TaskId id, - final String applicationId, - final StreamsConfig config, - final ProcessorStateManager stateMgr, - final StreamsMetrics metrics) { + StandbyContextImpl(final TaskId id, + final String applicationId, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetrics metrics) { super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics)); } @@ -163,7 +160,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); } - /** * @throws UnsupportedOperationException on every invocation */ @@ -180,7 +176,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks."); } - @Override public void setCurrentNode(final ProcessorNode currentNode) { // no-op. can't throw as this is called on commit when the StateStores get flushed. @@ -193,4 +188,5 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle public ProcessorNode currentNode() { throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks."); } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index b09c8cd..b06b998 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -59,7 +59,7 @@ public class StandbyTask extends AbstractTask { final StreamsConfig config, final StreamsMetrics metrics, final StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null); + super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config); // initialize the topology with its own context processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); @@ -110,8 +110,16 @@ public class StandbyTask extends AbstractTask { stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap()); } + /** + * <pre> + * - {@link #commit()} + * - close state + * <pre> + * @param clean ignored by {@code StandbyTask} as it can always try to close cleanly + * (ie, commit, flush, and write checkpoint file) + */ @Override - public void close() { + public void close(final boolean clean) { log.debug("{} Closing", logPrefix); boolean committedSuccessfully = false; try { @@ -127,7 +135,8 @@ public class StandbyTask extends AbstractTask { * * @return a list of records not consumed */ - public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> records) { + public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, + final List<ConsumerRecord<byte[], byte[]>> records) { log.debug("{} Updating standby replicas of its state store for partition [{}]", logPrefix, partition); return stateMgr.updateStandbyStates(partition, records); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 568a5b4..731030d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -20,8 +20,10 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -56,8 +58,8 @@ public class StreamTask extends AbstractTask implements Punctuator { private final Map<TopicPartition, Long> consumedOffsets; private final RecordCollector recordCollector; + private final Producer<byte[], byte[]> producer; private final int maxBufferedSize; - private final boolean exactlyOnceEnabled; private boolean commitRequested = false; private boolean commitOffsetNeeded = false; @@ -91,7 +93,7 @@ public class StreamTask extends AbstractTask implements Punctuator { * @param config the {@link StreamsConfig} specified by the user * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread - * @param recordCollector the instance of {@link RecordCollector} used to produce records + * @param producer the instance of {@link Producer} used to produce records */ public StreamTask(final TaskId id, final String applicationId, @@ -104,11 +106,10 @@ public class StreamTask extends AbstractTask implements Punctuator { final StateDirectory stateDirectory, final ThreadCache cache, final Time time, - final RecordCollector recordCollector) { - super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache); + final Producer<byte[], byte[]> producer) { + super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config); punctuationQueue = new PunctuationQueue(); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); - exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE); this.metrics = new TaskMetrics(metrics); // create queues for each assigned partition and associate them @@ -128,8 +129,8 @@ public class StreamTask extends AbstractTask implements Punctuator { // initialize the consumed offset cache consumedOffsets = new HashMap<>(); - // create the record recordCollector that maintains the produced offsets - this.recordCollector = recordCollector; + this.producer = producer; + recordCollector = createRecordCollector(); // initialize the topology with its own context processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); @@ -137,16 +138,26 @@ public class StreamTask extends AbstractTask implements Punctuator { log.debug("{} Initializing", logPrefix); initializeStateStores(); stateMgr.registerGlobalStateStores(topology.globalStateStores()); + if (eosEnabled) { + producer.initTransactions(); + producer.beginTransaction(); + } initTopology(); processorContext.initialized(); } /** - * re-initialize the task + * <pre> + * - re-initialize the task + * - if (eos) begin new transaction + * </pre> */ @Override public void resume() { log.debug("{} Resuming", logPrefix); + if (eosEnabled) { + producer.beginTransaction(); + } initTopology(); } @@ -229,13 +240,18 @@ public class StreamTask extends AbstractTask implements Punctuator { /** * <pre> - * - flush state and producer - * - write checkpoint - * - commit offsets + * - flush state and producer + * - if(!eos) write checkpoint + * - commit offsets and start new transaction * </pre> */ @Override public void commit() { + commitImpl(true); + } + + // visible for testing + void commitImpl(final boolean startNewTransaction) { log.trace("{} Committing", logPrefix); metrics.metrics.measureLatencyNs( time, @@ -243,8 +259,10 @@ public class StreamTask extends AbstractTask implements Punctuator { @Override public void run() { flushState(); - stateMgr.checkpoint(recordCollectorOffsets()); - commitOffsets(); + if (!eosEnabled) { + stateMgr.checkpoint(recordCollectorOffsets()); + } + commitOffsets(startNewTransaction); } }, metrics.taskCommitTimeSensor); @@ -262,7 +280,7 @@ public class StreamTask extends AbstractTask implements Punctuator { recordCollector.flush(); } - private void commitOffsets() { + private void commitOffsets(final boolean startNewTransaction) { if (commitOffsetNeeded) { log.debug("{} Committing offsets", logPrefix); final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); @@ -272,11 +290,20 @@ public class StreamTask extends AbstractTask implements Punctuator { consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); stateMgr.putOffsetLimit(partition, offset); } - try { - consumer.commitSync(consumedOffsetsAndMetadata); - } catch (final CommitFailedException cfe) { - log.warn("{} Failed offset commits: {} ", logPrefix, consumedOffsetsAndMetadata); - throw cfe; + + if (eosEnabled) { + producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); + producer.commitTransaction(); + if (startNewTransaction) { + producer.beginTransaction(); + } + } else { + try { + consumer.commitSync(consumedOffsetsAndMetadata); + } catch (final CommitFailedException e) { + log.warn("{} Failed offset commits {} due to {}", logPrefix, consumedOffsetsAndMetadata, e.getMessage()); + throw e; + } } commitOffsetNeeded = false; } @@ -299,18 +326,33 @@ public class StreamTask extends AbstractTask implements Punctuator { /** * <pre> - * - close topology - * - {@link #commit()} - * - flush state and producer - * - write checkpoint - * - commit offsets + * - close topology + * - {@link #commit()} + * - flush state and producer + * - if (!eos) write checkpoint + * - commit offsets * </pre> */ @Override public void suspend() { + suspend(true); + } + + /** + * <pre> + * - close topology + * - if (clean) {@link #commit()} + * - flush state and producer + * - if (!eos) write checkpoint + * - commit offsets + * </pre> + */ + private void suspend(final boolean clean) { log.debug("{} Suspending", logPrefix); - closeTopology(); - commit(); + closeTopology(); // should we call this only on clean suspend? + if (clean) { + commitImpl(false); + } } private void closeTopology() { @@ -339,28 +381,53 @@ public class StreamTask extends AbstractTask implements Punctuator { /** * <pre> - * - {@link #suspend()} - * - close topology - * - {@link #commit()} - * - flush state and producer - * - write checkpoint - * - commit offsets - * - close state + * - {@link #suspend(boolean) suspend(clean)} + * - close topology + * - if (clean) {@link #commit()} + * - flush state and producer + * - commit offsets + * - close state + * - if (clean) write checkpoint + * - if (eos) close producer * </pre> + * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- + * otherwise, just close open resources */ @Override - public void close() { + public void close(boolean clean) { log.debug("{} Closing", logPrefix); + + RuntimeException firstException = null; + try { + suspend(clean); + } catch (final RuntimeException e) { + clean = false; + firstException = e; + log.error("{} Could not close task due to {}", logPrefix, e); + } + + try { + closeStateManager(clean); + } catch (final RuntimeException e) { + clean = false; + if (firstException == null) { + firstException = e; + } + log.error("{} Could not close state manager due to {}", logPrefix, e); + } + try { - suspend(); - closeStateManager(true); partitionGroup.close(); metrics.removeAllSensors(); - } catch (final RuntimeException e) { - closeStateManager(false); - throw e; } finally { - if (exactlyOnceEnabled) { + if (eosEnabled) { + if (!clean) { + try { + producer.abortTransaction(); + } catch (final ProducerFencedException e) { + // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens + } + } try { recordCollector.close(); } catch (final Throwable e) { @@ -368,6 +435,10 @@ public class StreamTask extends AbstractTask implements Punctuator { } } } + + if (firstException != null) { + throw firstException; + } } /** @@ -450,9 +521,14 @@ public class StreamTask extends AbstractTask implements Punctuator { return processorContext; } - // for testing only + // visible for testing only RecordCollector recordCollector() { return recordCollector; } + // visible for testing only + RecordCollector createRecordCollector() { + return new RecordCollectorImpl(producer, id.toString()); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7918196..f16e323 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -24,10 +24,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -305,6 +307,11 @@ public class StreamThread extends Thread { } } + interface StreamTaskAction { + String name(); + void apply(final StreamTask task); + } + /** * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and * overrides one of its functions for efficiency @@ -409,7 +416,7 @@ public class StreamThread extends Thread { private long lastCleanMs; private long lastCommitMs; private Throwable rebalanceException = null; - private final boolean exactlyOnceEnabled; + private final boolean eosEnabled; private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; private boolean processStandbyRecords = false; @@ -451,7 +458,7 @@ public class StreamThread extends Thread { log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", logPrefix); } cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); - exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE); + eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); // set the consumer clients @@ -507,8 +514,10 @@ public class StreamThread extends Thread { public void run() { log.info("{} Starting", logPrefix); + boolean cleanRun = false; try { runLoop(); + cleanRun = true; } catch (final KafkaException e) { // just re-throw the exception as it should be logged already throw e; @@ -518,7 +527,7 @@ public class StreamThread extends Thread { log.error("{} Streams application error during processing: {}", logPrefix, e); throw e; } finally { - shutdown(); + shutdown(cleanRun); } } @@ -568,7 +577,9 @@ public class StreamThread extends Thread { } if (rebalanceException != null) { - throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException); + if (!(rebalanceException instanceof ProducerFencedException)) { + throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException); + } } return records; @@ -651,14 +662,22 @@ public class StreamThread extends Thread { // until no task has any records left do { totalProcessedEachRound = 0; - for (final StreamTask task : tasks.values()) { - // we processed one record, - // and more are buffered waiting for the next round - if (task.process()) { - totalProcessedEachRound++; - totalProcessedSinceLastMaybeCommit++; + final Iterator<Map.Entry<TaskId, StreamTask>> it = tasks.entrySet().iterator(); + while (it.hasNext()) { + final StreamTask task = it.next().getValue(); + try { + // we processed one record, + // if more are buffered waiting for the next round + if (task.process()) { + totalProcessedEachRound++; + totalProcessedSinceLastMaybeCommit++; + } + } catch (final ProducerFencedException e) { + closeZombieTask(task); + it.remove(); } } + if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS && totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) { totalProcessedSinceLastMaybeCommit = 0; @@ -670,11 +689,25 @@ public class StreamThread extends Thread { } while (totalProcessedEachRound != 0); // go over the tasks again to punctuate or commit - for (final StreamTask task : tasks.values()) { - maybePunctuate(task); - if (task.commitNeeded()) { - commitOne(task); + final RuntimeException e = performOnStreamTasks(new StreamTaskAction() { + private String name; + @Override + public String name() { + return name; } + + @Override + public void apply(final StreamTask task) { + name = "punctuate"; + maybePunctuate(task); + if (task.commitNeeded()) { + name = "commit"; + commitOne(task); + } + } + }); + if (e != null) { + throw e; } return totalProcessedSinceLastMaybeCommit; @@ -745,9 +778,21 @@ public class StreamThread extends Thread { * Commit the states of all its tasks */ private void commitAll() { - for (final StreamTask task : activeTasks.values()) { - commitOne(task); + final RuntimeException e = performOnStreamTasks(new StreamTaskAction() { + @Override + public String name() { + return "commit"; + } + + @Override + public void apply(final StreamTask task) { + commitOne(task); + } + }); + if (e != null) { + throw e; } + for (final StandbyTask task : standbyTasks.values()) { commitOne(task); } @@ -762,10 +807,10 @@ public class StreamThread extends Thread { task.commit(); } catch (final CommitFailedException e) { // commit failed. Just log it. - log.warn("{} Failed to commit {} {} state: {}", logPrefix, task.getClass().getSimpleName(), task.id(), e); + log.warn("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(), task.id(), e); } catch (final KafkaException e) { // commit failed due to an unexpected exception. Log it and rethrow the exception. - log.error("{} Failed to commit {} {} state: {}", logPrefix, task.getClass().getSimpleName(), task.id(), e); + log.error("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(), task.id(), e); throw e; } @@ -983,9 +1028,9 @@ public class StreamThread extends Thread { this.partitionAssignor = partitionAssignor; } - private void shutdown() { + private void shutdown(final boolean cleanRun) { log.info("{} Shutting down", logPrefix); - shutdownTasksAndState(); + shutdownTasksAndState(cleanRun); // close all embedded clients if (threadProducer != null) { @@ -1022,7 +1067,7 @@ public class StreamThread extends Thread { } @SuppressWarnings("ThrowableNotThrown") - private void shutdownTasksAndState() { + private void shutdownTasksAndState(final boolean cleanRun) { log.debug("{} shutdownTasksAndState: shutting down" + "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet(), @@ -1030,7 +1075,7 @@ public class StreamThread extends Thread { for (final AbstractTask task : allTasks()) { try { - task.close(); + task.close(cleanRun); } catch (final RuntimeException e) { log.error("{} Failed while closing {} {} due to {}: ", logPrefix, @@ -1054,7 +1099,19 @@ public class StreamThread extends Thread { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - for (final AbstractTask task : activeAndStandbytasks()) { + firstException.compareAndSet(null, performOnStreamTasks(new StreamTaskAction() { + @Override + public String name() { + return "suspend"; + } + + @Override + public void apply(final StreamTask task) { + task.suspend(); + } + })); + + for (final StandbyTask task : standbyTasks.values()) { try { task.suspend(); } catch (final RuntimeException e) { @@ -1131,7 +1188,7 @@ public class StreamThread extends Thread { if (!task.partitions().equals(assignedPartitionsForTask)) { log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id()); try { - task.close(); + task.close(true); } catch (final Exception e) { log.error("{} Failed to remove suspended task {}: {}", logPrefix, next.getKey(), e); } finally { @@ -1150,7 +1207,7 @@ public class StreamThread extends Thread { final StandbyTask task = suspendedTask.getValue(); log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, task.id()); try { - task.close(); + task.close(true); } catch (final Exception e) { log.error("{} Failed to remove suspended standby task {}: {}", logPrefix, task.id(), e); } finally { @@ -1165,27 +1222,32 @@ public class StreamThread extends Thread { streamsMetrics.taskCreatedSensor.record(); - return new StreamTask( - id, - applicationId, - partitions, - builder.build(id.topicGroupId), - consumer, - storeChangelogReader, - config, - streamsMetrics, - stateDirectory, - cache, - time, - createRecordCollector(id)); + try { + return new StreamTask( + id, + applicationId, + partitions, + builder.build(id.topicGroupId), + consumer, + storeChangelogReader, + config, + streamsMetrics, + stateDirectory, + cache, + time, + createProducer(id)); + } finally { + log.info("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions); + } } - private RecordCollector createRecordCollector(final TaskId id) { + private Producer<byte[], byte[]> createProducer(final TaskId id) { final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId); final Producer<byte[], byte[]> producer; - if (exactlyOnceEnabled) { + if (eosEnabled) { log.info("{} Creating producer client for task {}", logPrefix, id); + producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + id); producer = clientSupplier.getProducer(producerConfigs); } else { if (threadProducer == null) { @@ -1195,7 +1257,7 @@ public class StreamThread extends Thread { producer = threadProducer; } - return new RecordCollectorImpl(producer, id.toString()); + return producer; } private void addStreamTasks(final Collection<TopicPartition> assignment, final long start) { @@ -1354,4 +1416,40 @@ public class StreamThread extends Thread { standbyRecords.clear(); } + private void closeZombieTask(final StreamTask task) { + log.warn("{} Producer of task {} fenced; closing zombie task.", logPrefix, task.id); + try { + task.close(false); + } catch (final Exception f) { + log.warn("{} Failed to close zombie task: ", logPrefix, f); + } + activeTasks.remove(task.id); + } + + + private RuntimeException performOnStreamTasks(final StreamTaskAction action) { + RuntimeException firstException = null; + final Iterator<Map.Entry<TaskId, StreamTask>> it = activeTasks.entrySet().iterator(); + while (it.hasNext()) { + final StreamTask task = it.next().getValue(); + try { + action.apply(task); + } catch (final ProducerFencedException e) { + closeZombieTask(task); + it.remove(); + } catch (final RuntimeException t) { + log.error("{} Failed to {} stream task {} due to: {}", + logPrefix, + action.name(), + task.id(), + t); + if (firstException == null) { + firstException = t; + } + } + } + + return firstException; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 54d2165..8c14737 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -57,27 +57,28 @@ public class OffsetCheckpoint { private final File file; private final Object lock; - public OffsetCheckpoint(File file) { + public OffsetCheckpoint(final File file) { this.file = file; - this.lock = new Object(); + lock = new Object(); } /** * @throws IOException if any file operation fails with an IO exception */ - public void write(Map<TopicPartition, Long> offsets) throws IOException { + public void write(final Map<TopicPartition, Long> offsets) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file - File temp = new File(file.getAbsolutePath() + ".tmp"); + final File temp = new File(file.getAbsolutePath() + ".tmp"); - FileOutputStream fileOutputStream = new FileOutputStream(temp); + final FileOutputStream fileOutputStream = new FileOutputStream(temp); try (BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { writeIntLine(writer, VERSION); writeIntLine(writer, offsets.size()); - for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) + for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) { writeEntry(writer, entry.getKey(), entry.getValue()); + } writer.flush(); fileOutputStream.getFD().sync(); @@ -90,7 +91,8 @@ public class OffsetCheckpoint { /** * @throws IOException if file write operations failed with any IO exception */ - private void writeIntLine(BufferedWriter writer, int number) throws IOException { + private void writeIntLine(final BufferedWriter writer, + final int number) throws IOException { writer.write(Integer.toString(number)); writer.newLine(); } @@ -98,7 +100,9 @@ public class OffsetCheckpoint { /** * @throws IOException if file write operations failed with any IO exception */ - private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException { + private void writeEntry(final BufferedWriter writer, + final TopicPartition part, + final long offset) throws IOException { writer.write(part.topic()); writer.write(' '); writer.write(Integer.toString(part.partition())); @@ -114,43 +118,39 @@ public class OffsetCheckpoint { */ public Map<TopicPartition, Long> read() throws IOException { synchronized (lock) { - BufferedReader reader; - try { - reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)); - } catch (FileNotFoundException e) { - return Collections.emptyMap(); - } + try (BufferedReader reader + = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) { - try { - int version = readInt(reader); + final int version = readInt(reader); switch (version) { case 0: - int expectedSize = readInt(reader); - Map<TopicPartition, Long> offsets = new HashMap<>(); + final int expectedSize = readInt(reader); + final Map<TopicPartition, Long> offsets = new HashMap<>(); String line = reader.readLine(); while (line != null) { - String[] pieces = line.split("\\s+"); - if (pieces.length != 3) - throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.", - line)); - - String topic = pieces[0]; - int partition = Integer.parseInt(pieces[1]); - long offset = Long.parseLong(pieces[2]); + final String[] pieces = line.split("\\s+"); + if (pieces.length != 3) { + throw new IOException( + String.format("Malformed line in offset checkpoint file: '%s'.", line)); + } + + final String topic = pieces[0]; + final int partition = Integer.parseInt(pieces[1]); + final long offset = Long.parseLong(pieces[2]); offsets.put(new TopicPartition(topic, partition), offset); line = reader.readLine(); } - if (offsets.size() != expectedSize) - throw new IOException(String.format("Expected %d entries but found only %d", - expectedSize, - offsets.size())); + if (offsets.size() != expectedSize) { + throw new IOException( + String.format("Expected %d entries but found only %d", expectedSize, offsets.size())); + } return offsets; default: throw new IllegalArgumentException("Unknown offset checkpoint version: " + version); } - } finally { - reader.close(); + } catch (final FileNotFoundException e) { + return Collections.emptyMap(); } } } @@ -158,10 +158,11 @@ public class OffsetCheckpoint { /** * @throws IOException if file read ended prematurely */ - private int readInt(BufferedReader reader) throws IOException { - String line = reader.readLine(); - if (line == null) + private int readInt(final BufferedReader reader) throws IOException { + final String line = reader.readLine(); + if (line == null) { throw new EOFException("File ended prematurely."); + } return Integer.parseInt(line); } @@ -169,12 +170,12 @@ public class OffsetCheckpoint { * @throws IOException if there is any IO exception during delete */ public void delete() throws IOException { - Files.delete(file.toPath()); + Files.deleteIfExists(file.toPath()); } @Override public String toString() { - return this.file.getAbsolutePath(); + return file.getAbsolutePath(); } }
