This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2a63c5113902c71a41b9ef48b6fd3cd29b5892da Author: Guozhang Wang <[email protected]> AuthorDate: Thu Jul 26 17:58:29 2018 -0700 KAFKA-7192: Wipe out if EOS is turned on and checkpoint file does not exist (#5421) 1. As titled and as described in comments. 2. Modified unit test slightly to insert for new keys in committed data to expose this issue. Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]> --- .../streams/processor/internals/AbstractTask.java | 4 ++ .../streams/processor/internals/StateRestorer.java | 4 ++ .../processor/internals/StoreChangelogReader.java | 24 ++++++++-- .../streams/integration/EosIntegrationTest.java | 26 +++++++---- .../internals/StoreChangelogReaderTest.java | 51 +++++++++++++++------- 5 files changed, 81 insertions(+), 28 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 188ff47..94e4c71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -137,6 +137,10 @@ public abstract class AbstractTask implements Task { return toString(""); } + public boolean isEosEnabled() { + return eosEnabled; + } + /** * Produces a string representation containing useful information about a Task starting with the given indent. * This is useful in debugging scenarios. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 33dce9e..c1a41ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -55,6 +55,10 @@ public class StateRestorer { return partition; } + public String storeName() { + return storeName; + } + long checkpoint() { return checkpoint == null ? NO_CHECKPOINT : checkpoint; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 07af801..1927b5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -71,7 +71,7 @@ public class StoreChangelogReader implements ChangelogReader { public Collection<TopicPartition> restore(final RestoringTasks active) { if (!needsInitializing.isEmpty()) { - initialize(); + initialize(active); } if (needsRestoring.isEmpty()) { @@ -111,7 +111,7 @@ public class StoreChangelogReader implements ChangelogReader { return completed(); } - private void initialize() { + private void initialize(final RestoringTasks active) { if (!restoreConsumer.subscription().isEmpty()) { throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")"); } @@ -165,11 +165,12 @@ public class StoreChangelogReader implements ChangelogReader { // set up restorer for those initializable if (!initializable.isEmpty()) { - startRestoration(initializable); + startRestoration(initializable, active); } } - private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) { + private void startRestoration(final Map<TopicPartition, StateRestorer> initialized, + final RestoringTasks active) { log.debug("Start restoring state stores from changelog topics {}", initialized.keySet()); final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment()); @@ -186,6 +187,18 @@ public class StoreChangelogReader implements ChangelogReader { restorer.setStartingOffset(restoreConsumer.position(restorer.partition())); restorer.restoreStarted(); } else { + final StreamTask task = active.restoringTaskFor(restorer.partition()); + + // If checkpoint does not exist it means the task was not shutdown gracefully before; + // and in this case if EOS is turned on we should wipe out the state and re-initialize the task + if (task.isEosEnabled()) { + log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " + + "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition()); + task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition())); + } else { + log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition()); + } + restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition())); needsPositionUpdate.add(restorer); } @@ -280,6 +293,9 @@ public class StoreChangelogReader implements ChangelogReader { if (!restoreRecords.isEmpty()) { restorer.restore(restoreRecords); restorer.restoreBatchCompleted(lastRestoredOffset, records.size()); + + log.trace("Restored from {} to {} with {} records, ending offset is {}, next starting position is {}", + restorer.partition(), restorer.storeName(), records.size(), lastRestoredOffset, nextPosition); } return nextPosition; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 30c90c2..770f579 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -391,8 +391,9 @@ public class EosIntegrationTest { // the app is supposed to emit all 40 update records into the output topic // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition) + // in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store // - // the failure gets inject after 20 committed and 30 uncommitted records got received + // the failure gets inject after 20 committed and 10 uncommitted records got received // -> the failure only kills one thread // after fail over, we should read 40 committed records and the state stores should contain the correct sums // per key (even if some records got processed twice) @@ -402,7 +403,7 @@ public class EosIntegrationTest { streams.start(); final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); - final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L); + final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L); final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(); dataBeforeFailure.addAll(committedDataBeforeFailure); @@ -610,10 +611,6 @@ public class EosIntegrationTest { @Override public KeyValue<Long, Long> transform(final Long key, final Long value) { - if (errorInjected.compareAndSet(true, false)) { - // only tries to fail once on one of the task - throw new RuntimeException("Injected test exception."); - } if (gcInjected.compareAndSet(true, false)) { while (doGC) { try { @@ -631,16 +628,27 @@ public class EosIntegrationTest { if (state != null) { Long sum = state.get(key); + if (sum == null) { sum = value; } else { sum += value; } state.put(key, sum); - context.forward(key, sum); - return null; + state.flush(); + } + + + if (errorInjected.compareAndSet(true, false)) { + // only tries to fail once on one of the task + throw new RuntimeException("Injected test exception."); + } + + if (state != null) { + return new KeyValue<>(key, state.get(key)); + } else { + return new KeyValue<>(key, value); } - return new KeyValue<>(key, value); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 90abf32..1e74d47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -119,7 +119,10 @@ public class StoreChangelogReaderTest { final int messages = 10; setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); + assertThat(callback.restored.size(), equalTo(messages)); } @@ -136,8 +139,8 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task); - EasyMock.replay(active); + EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + EasyMock.replay(active, task); // first restore call "fails" but we should not die with an exception assertEquals(0, changelogReader.restore(active).size()); @@ -164,7 +167,8 @@ public class StoreChangelogReaderTest { setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet())); } @@ -175,6 +179,8 @@ public class StoreChangelogReaderTest { final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true, "storeName"); changelogReader.register(restorer); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(3)); assertThat(restorer.restoredOffset(), equalTo(3L)); @@ -192,14 +198,14 @@ public class StoreChangelogReaderTest { setupConsumer(5, one); setupConsumer(3, two); - changelogReader - .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1")); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1")); changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); - expect(active.restoringTaskFor(one)).andReturn(null); - expect(active.restoringTaskFor(two)).andReturn(null); - replay(active); + expect(active.restoringTaskFor(one)).andStubReturn(task); + expect(active.restoringTaskFor(two)).andStubReturn(task); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(10)); @@ -224,9 +230,13 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); - expect(active.restoringTaskFor(one)).andReturn(null); - expect(active.restoringTaskFor(two)).andReturn(null); - replay(active); + expect(active.restoringTaskFor(one)).andReturn(task); + expect(active.restoringTaskFor(two)).andReturn(task); + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + changelogReader.restore(active); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(10)); @@ -248,6 +258,8 @@ public class StoreChangelogReaderTest { setupConsumer(10, topicPartition); changelogReader .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(5)); @@ -306,7 +318,10 @@ public class StoreChangelogReaderTest { public void shouldReturnRestoredOffsetsForPersistentStores() { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); + final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L))); } @@ -315,6 +330,8 @@ public class StoreChangelogReaderTest { public void shouldNotReturnRestoredOffsetsForNonPersistentStore() { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap())); @@ -330,6 +347,8 @@ public class StoreChangelogReaderTest { consumer.assign(Collections.singletonList(topicPartition)); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes)))); @@ -340,6 +359,9 @@ public class StoreChangelogReaderTest { final Collection<TopicPartition> expected = Collections.singleton(topicPartition); setupConsumer(0, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "store")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + final Collection<TopicPartition> restored = changelogReader.restore(active); assertThat(restored, equalTo(expected)); } @@ -354,10 +376,9 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); final TopicPartition postInitialization = new TopicPartition("other", 0); - expect(active.restoringTaskFor(topicPartition)).andReturn(null); - expect(active.restoringTaskFor(topicPartition)).andReturn(null); - expect(active.restoringTaskFor(postInitialization)).andReturn(null); - replay(active); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + expect(active.restoringTaskFor(postInitialization)).andStubReturn(task); + replay(active, task); assertTrue(changelogReader.restore(active).isEmpty());
