This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 061885e KAFKA-7192: Wipe out if EOS is turned on and checkpoint file
does not exist (#5421)
061885e is described below
commit 061885e9f1675221f31ce47cfb0e59eb5748b9c7
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Jul 26 17:58:29 2018 -0700
KAFKA-7192: Wipe out if EOS is turned on and checkpoint file does not exist
(#5421)
1. As titled and as described in comments.
2. Modified unit test slightly to insert for new keys in committed data to
expose this issue.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/AbstractTask.java | 4 ++
.../streams/processor/internals/StateRestorer.java | 4 ++
.../processor/internals/StoreChangelogReader.java | 24 ++++++++--
.../streams/integration/EosIntegrationTest.java | 26 +++++++----
.../internals/StoreChangelogReaderTest.java | 51 +++++++++++++++-------
5 files changed, 81 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 188ff47..94e4c71 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -137,6 +137,10 @@ public abstract class AbstractTask implements Task {
return toString("");
}
+ public boolean isEosEnabled() {
+ return eosEnabled;
+ }
+
/**
* Produces a string representation containing useful information about a
Task starting with the given indent.
* This is useful in debugging scenarios.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e..c1a41ce 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -55,6 +55,10 @@ public class StateRestorer {
return partition;
}
+ public String storeName() {
+ return storeName;
+ }
+
long checkpoint() {
return checkpoint == null ? NO_CHECKPOINT : checkpoint;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 07af801..1927b5a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -71,7 +71,7 @@ public class StoreChangelogReader implements ChangelogReader {
public Collection<TopicPartition> restore(final RestoringTasks active) {
if (!needsInitializing.isEmpty()) {
- initialize();
+ initialize(active);
}
if (needsRestoring.isEmpty()) {
@@ -111,7 +111,7 @@ public class StoreChangelogReader implements
ChangelogReader {
return completed();
}
- private void initialize() {
+ private void initialize(final RestoringTasks active) {
if (!restoreConsumer.subscription().isEmpty()) {
throw new StreamsException("Restore consumer should not be
subscribed to any topics (" + restoreConsumer.subscription() + ")");
}
@@ -165,11 +165,12 @@ public class StoreChangelogReader implements
ChangelogReader {
// set up restorer for those initializable
if (!initializable.isEmpty()) {
- startRestoration(initializable);
+ startRestoration(initializable, active);
}
}
- private void startRestoration(final Map<TopicPartition, StateRestorer>
initialized) {
+ private void startRestoration(final Map<TopicPartition, StateRestorer>
initialized,
+ final RestoringTasks active) {
log.debug("Start restoring state stores from changelog topics {}",
initialized.keySet());
final Set<TopicPartition> assignment = new
HashSet<>(restoreConsumer.assignment());
@@ -186,6 +187,18 @@ public class StoreChangelogReader implements
ChangelogReader {
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
restorer.restoreStarted();
} else {
+ final StreamTask task =
active.restoringTaskFor(restorer.partition());
+
+ // If checkpoint does not exist it means the task was not
shutdown gracefully before;
+ // and in this case if EOS is turned on we should wipe out the
state and re-initialize the task
+ if (task.isEosEnabled()) {
+ log.info("No checkpoint found for task {} state store {}
changelog {} with EOS turned on. " +
+ "Reinitializing the task and restore its state
from the beginning.", task.id, restorer.storeName(), restorer.partition());
+
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
+ } else {
+ log.info("Restoring task {}'s state store {} from
beginning of the changelog {} ", task.id, restorer.storeName(),
restorer.partition());
+ }
+
restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
needsPositionUpdate.add(restorer);
}
@@ -280,6 +293,9 @@ public class StoreChangelogReader implements
ChangelogReader {
if (!restoreRecords.isEmpty()) {
restorer.restore(restoreRecords);
restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
+
+ log.trace("Restored from {} to {} with {} records, ending offset
is {}, next starting position is {}",
+ restorer.partition(), restorer.storeName(),
records.size(), lastRestoredOffset, nextPosition);
}
return nextPosition;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 30c90c2..770f579 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -391,8 +391,9 @@ public class EosIntegrationTest {
// the app is supposed to emit all 40 update records into the output
topic
// the app commits after each 10 records per partition, and thus will
have 2*5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog
topic per partition)
+ // in the uncommitted batch sending some data for the new key to
validate that upon resuming they will not be shown up in the store
//
- // the failure gets inject after 20 committed and 30 uncommitted
records got received
+ // the failure gets inject after 20 committed and 10 uncommitted
records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state
stores should contain the correct sums
// per key (even if some records got processed twice)
@@ -402,7 +403,7 @@ public class EosIntegrationTest {
streams.start();
final List<KeyValue<Long, Long>> committedDataBeforeFailure =
prepareData(0L, 10L, 0L, 1L);
- final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure =
prepareData(10L, 15L, 0L, 1L);
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure =
prepareData(10L, 15L, 0L, 1L, 2L, 3L);
final List<KeyValue<Long, Long>> dataBeforeFailure = new
ArrayList<>();
dataBeforeFailure.addAll(committedDataBeforeFailure);
@@ -610,10 +611,6 @@ public class EosIntegrationTest {
@Override
public KeyValue<Long, Long> transform(final Long key,
final Long value) {
- if (errorInjected.compareAndSet(true, false)) {
- // only tries to fail once on one of the task
- throw new RuntimeException("Injected test
exception.");
- }
if (gcInjected.compareAndSet(true, false)) {
while (doGC) {
try {
@@ -631,16 +628,27 @@ public class EosIntegrationTest {
if (state != null) {
Long sum = state.get(key);
+
if (sum == null) {
sum = value;
} else {
sum += value;
}
state.put(key, sum);
- context.forward(key, sum);
- return null;
+ state.flush();
+ }
+
+
+ if (errorInjected.compareAndSet(true, false)) {
+ // only tries to fail once on one of the task
+ throw new RuntimeException("Injected test
exception.");
+ }
+
+ if (state != null) {
+ return new KeyValue<>(key, state.get(key));
+ } else {
+ return new KeyValue<>(key, value);
}
- return new KeyValue<>(key, value);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 90abf32..1e74d47 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -119,7 +119,10 @@ public class StoreChangelogReaderTest {
final int messages = 10;
setupConsumer(messages, topicPartition);
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
+
assertThat(callback.restored.size(), equalTo(messages));
}
@@ -136,8 +139,8 @@ public class StoreChangelogReaderTest {
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true,
"storeName"));
-
EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- EasyMock.replay(active);
+
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ EasyMock.replay(active, task);
// first restore call "fails" but we should not die with an exception
assertEquals(0, changelogReader.restore(active).size());
@@ -164,7 +167,8 @@ public class StoreChangelogReaderTest {
setupConsumer(messages, topicPartition);
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true,
"storeName"));
-
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
assertThat(consumer.assignment(),
equalTo(Collections.<TopicPartition>emptySet()));
}
@@ -175,6 +179,8 @@ public class StoreChangelogReaderTest {
final StateRestorer restorer = new StateRestorer(topicPartition,
restoreListener, null, 3, true,
"storeName");
changelogReader.register(restorer);
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(3));
assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -192,14 +198,14 @@ public class StoreChangelogReaderTest {
setupConsumer(5, one);
setupConsumer(3, two);
- changelogReader
- .register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName1"));
+ changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
changelogReader.register(new StateRestorer(one, restoreListener1,
null, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2,
null, Long.MAX_VALUE, true, "storeName3"));
- expect(active.restoringTaskFor(one)).andReturn(null);
- expect(active.restoringTaskFor(two)).andReturn(null);
- replay(active);
+ expect(active.restoringTaskFor(one)).andStubReturn(task);
+ expect(active.restoringTaskFor(two)).andStubReturn(task);
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(10));
@@ -224,9 +230,13 @@ public class StoreChangelogReaderTest {
changelogReader.register(new StateRestorer(one, restoreListener1,
null, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2,
null, Long.MAX_VALUE, true, "storeName3"));
- expect(active.restoringTaskFor(one)).andReturn(null);
- expect(active.restoringTaskFor(two)).andReturn(null);
- replay(active);
+ expect(active.restoringTaskFor(one)).andReturn(task);
+ expect(active.restoringTaskFor(two)).andReturn(task);
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
+ changelogReader.restore(active);
+
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(10));
@@ -248,6 +258,8 @@ public class StoreChangelogReaderTest {
setupConsumer(10, topicPartition);
changelogReader
.register(new StateRestorer(topicPartition, restoreListener, null,
5, true, "storeName1"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(5));
@@ -306,7 +318,10 @@ public class StoreChangelogReaderTest {
public void shouldReturnRestoredOffsetsForPersistentStores() {
setupConsumer(10, topicPartition);
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
+
final Map<TopicPartition, Long> restoredOffsets =
changelogReader.restoredOffsets();
assertThat(restoredOffsets,
equalTo(Collections.singletonMap(topicPartition, 10L)));
}
@@ -315,6 +330,8 @@ public class StoreChangelogReaderTest {
public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
setupConsumer(10, topicPartition);
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
final Map<TopicPartition, Long> restoredOffsets =
changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.<TopicPartition,
Long>emptyMap()));
@@ -330,6 +347,8 @@ public class StoreChangelogReaderTest {
consumer.assign(Collections.singletonList(topicPartition));
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, false,
"storeName"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored,
CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes),
KeyValue.pair(bytes, bytes))));
@@ -340,6 +359,9 @@ public class StoreChangelogReaderTest {
final Collection<TopicPartition> expected =
Collections.singleton(topicPartition);
setupConsumer(0, topicPartition);
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "store"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
+
final Collection<TopicPartition> restored =
changelogReader.restore(active);
assertThat(restored, equalTo(expected));
}
@@ -354,10 +376,9 @@ public class StoreChangelogReaderTest {
changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
final TopicPartition postInitialization = new TopicPartition("other",
0);
- expect(active.restoringTaskFor(topicPartition)).andReturn(null);
- expect(active.restoringTaskFor(topicPartition)).andReturn(null);
- expect(active.restoringTaskFor(postInitialization)).andReturn(null);
- replay(active);
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+
expect(active.restoringTaskFor(postInitialization)).andStubReturn(task);
+ replay(active, task);
assertTrue(changelogReader.restore(active).isEmpty());