This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ba0ebca [KAFKA-6730] Simplify State Store Recovery (#5013) ba0ebca is described below commit ba0ebca7a516d4179b6327ddc60b0b49b1265347 Author: ConcurrencyPractitioner <yohan.richard...@gmail.com> AuthorDate: Tue Jun 5 13:35:47 2018 -0700 [KAFKA-6730] Simplify State Store Recovery (#5013) Reviewer: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <guozh...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../processor/internals/StoreChangelogReader.java | 60 ++++++-------------- .../streams/processor/internals/TaskManager.java | 1 - .../internals/StoreChangelogReaderTest.java | 65 ---------------------- 3 files changed, 16 insertions(+), 110 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 5fcba76..af5ff47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; @@ -50,6 +49,7 @@ public class StoreChangelogReader implements ChangelogReader { private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>(); private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>(); private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>(); + private Map<TopicPartition, Long> updatedEndOffsets = new HashMap<>(); public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer, final StateRestoreListener userStateRestoreListener, @@ -66,12 +66,12 @@ public class StoreChangelogReader implements ChangelogReader { needsInitializing.put(restorer.partition(), restorer); } - /** - * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored - */ public Collection<TopicPartition> restore(final RestoringTasks active) { if (!needsInitializing.isEmpty()) { initialize(); + final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet()); + remainingPartitions.removeAll(updatedEndOffsets.keySet()); + updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions)); } if (needsRestoring.isEmpty()) { @@ -79,11 +79,19 @@ public class StoreChangelogReader implements ChangelogReader { return completed(); } - final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet()); try { - final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10); - for (final TopicPartition partition : restoringPartitions) { - restorePartition(allRecords, partition, active.restoringTaskFor(partition)); + final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10); + final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator(); + while (iterator.hasNext()) { + final TopicPartition partition = iterator.next(); + final StateRestorer restorer = stateRestorers.get(partition); + final long pos = processNext(records.records(partition), restorer, updatedEndOffsets.get(partition)); + restorer.setRestoredOffset(pos); + if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) { + restorer.restoreDone(); + updatedEndOffsets.remove(partition); + iterator.remove(); + } } } catch (final InvalidOffsetException recoverableException) { log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException); @@ -240,41 +248,6 @@ public class StoreChangelogReader implements ChangelogReader { needsInitializing.clear(); } - /** - * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored - */ - private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords, - final TopicPartition topicPartition, - final Task task) { - final StateRestorer restorer = stateRestorers.get(topicPartition); - final Long endOffset = endOffsets.get(topicPartition); - final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset); - restorer.setRestoredOffset(pos); - if (restorer.hasCompleted(pos, endOffset)) { - if (pos > endOffset) { - throw new TaskMigratedException(task, topicPartition, endOffset, pos); - } - - // need to check for changelog topic - if (restorer.offsetLimit() == Long.MAX_VALUE) { - final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition); - if (!restorer.hasCompleted(pos, updatedEndOffset)) { - throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos); - } - } - - - log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}", - topicPartition, - restorer.restoredNumRecords(), - restorer.startingOffset(), - restorer.restoredOffset()); - - restorer.restoreDone(); - needsRestoring.remove(topicPartition); - } - } - private long processNext(final List<ConsumerRecord<byte[], byte[]>> records, final StateRestorer restorer, final Long endOffset) { @@ -326,7 +299,6 @@ public class StoreChangelogReader implements ChangelogReader { return true; } } - return false; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 6e6e4ca..44db70d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -309,7 +309,6 @@ public class TaskManager { /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition - * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only) */ boolean updateNewAndRestoringTasks() { active.initializeNewTasks(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index c65d4ef..aabe7ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.MockStateRestoreListener; @@ -377,46 +376,6 @@ public class StoreChangelogReaderTest { assertThat(callbackTwo.restored.size(), equalTo(3)); } - @Test - public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() { - final int messages = 10; - setupConsumer(messages, topicPartition); - consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L)); - changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - - expect(active.restoringTaskFor(topicPartition)).andReturn(task); - replay(active); - - try { - changelogReader.restore(active); - fail("Should have thrown TaskMigratedException"); - } catch (final TaskMigratedException expected) { - /* ignore */ - } - } - - - @Test - public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() { - final int messages = 10; - setupConsumer(messages, topicPartition); - // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic - // so a subsequent call to endOffsets returns a value exceeding the expected end value - consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L)); - changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - - expect(active.restoringTaskFor(topicPartition)).andReturn(task); - replay(active); - - try { - changelogReader.restore(active); - fail("Should have thrown TaskMigratedException"); - } catch (final TaskMigratedException expected) { - // verifies second block threw exception with updated end offset - assertTrue(expected.getMessage().contains("end offset 15, current offset 10")); - } - } - @Test public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() { @@ -435,30 +394,6 @@ public class StoreChangelogReaderTest { @Test - public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() { - final int totalMessages = 10; - assignPartition(totalMessages, topicPartition); - // records 0..4 - addRecords(5, topicPartition, 0); - //EOS enabled commit marker at offset 5 so rest of records 6..10 - addRecords(5, topicPartition, 6); - consumer.assign(Collections.<TopicPartition>emptyList()); - - // end offsets should start after commit marker of 5 from above - consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L)); - changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - - expect(active.restoringTaskFor(topicPartition)).andReturn(task); - replay(active); - try { - changelogReader.restore(active); - fail("Should have thrown task migrated exception"); - } catch (final TaskMigratedException expected) { - /* ignore */ - } - } - - @Test public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() { final int totalMessages = 10; setupConsumer(totalMessages, topicPartition); -- To stop receiving notification emails like this one, please contact mj...@apache.org.