This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8c3a7d KAFKA-7192 Follow-up: update checkpoint to the reset
beginning offset (#5430)
c8c3a7d is described below
commit c8c3a7dc48dc1ec6220798294da123bb617a6cd7
Author: Guozhang Wang <[email protected]>
AuthorDate: Fri Jul 27 22:11:55 2018 -0700
KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset
(#5430)
1. When we reinitialize the state store due to no CHECKPOINT with EOS
turned on, we should update the checkpoint to consumer.seekToBeginnning() /
consumer.position() to avoid falling into endless iterations.
2. Fixed a few other logic bugs around needsInitializing and needsRestoring.
Reviewers: Jason Gustafson <[email protected]>, Bill Bejeck
<[email protected]>
---
.../streams/processor/internals/StateRestorer.java | 10 +-
.../processor/internals/StoreChangelogReader.java | 117 ++++++++++++---------
.../streams/processor/internals/StreamThread.java | 2 +-
.../internals/StoreChangelogReaderTest.java | 17 ++-
4 files changed, 87 insertions(+), 59 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index c1a41ce..3bbf42e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@ public class StateRestorer {
static final int NO_CHECKPOINT = -1;
- private final Long checkpoint;
private final long offsetLimit;
private final boolean persistent;
private final String storeName;
private final TopicPartition partition;
private final CompositeRestoreListener compositeRestoreListener;
+ private long checkpointOffset;
private long restoredOffset;
private long startingOffset;
private long endingOffset;
@@ -45,7 +45,7 @@ public class StateRestorer {
final String storeName) {
this.partition = partition;
this.compositeRestoreListener = compositeRestoreListener;
- this.checkpoint = checkpoint;
+ this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT :
checkpoint;
this.offsetLimit = offsetLimit;
this.persistent = persistent;
this.storeName = storeName;
@@ -60,7 +60,11 @@ public class StateRestorer {
}
long checkpoint() {
- return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+ return checkpointOffset;
+ }
+
+ void setCheckpointOffset(final long checkpointOffset) {
+ this.checkpointOffset = checkpointOffset;
}
void restoreStarted() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 1927b5a..9185920 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
private final Map<String, List<PartitionInfo>> partitionInfo = new
HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new
HashMap<>();
- private final Map<TopicPartition, StateRestorer> needsRestoring = new
HashMap<>();
- private final Map<TopicPartition, StateRestorer> needsInitializing = new
HashMap<>();
+ private final Set<TopicPartition> needsRestoring = new HashSet<>();
+ private final Set<TopicPartition> needsInitializing = new HashSet<>();
+ private final Set<TopicPartition> completedRestorers = new HashSet<>();
private final Duration pollTime;
public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
@@ -64,9 +65,14 @@ public class StoreChangelogReader implements ChangelogReader
{
@Override
public void register(final StateRestorer restorer) {
- restorer.setUserRestoreListener(userStateRestoreListener);
- stateRestorers.put(restorer.partition(), restorer);
- needsInitializing.put(restorer.partition(), restorer);
+ if (!stateRestorers.containsKey(restorer.partition())) {
+ restorer.setUserRestoreListener(userStateRestoreListener);
+ stateRestorers.put(restorer.partition(), restorer);
+
+ log.trace("Added restorer for changelog {}", restorer.partition());
+ }
+
+ needsInitializing.add(restorer.partition());
}
public Collection<TopicPartition> restore(final RestoringTasks active) {
@@ -81,16 +87,15 @@ public class StoreChangelogReader implements
ChangelogReader {
try {
final ConsumerRecords<byte[], byte[]> records =
restoreConsumer.poll(pollTime);
- final Iterator<TopicPartition> iterator =
needsRestoring.keySet().iterator();
- while (iterator.hasNext()) {
- final TopicPartition partition = iterator.next();
+
+ for (final TopicPartition partition : needsRestoring) {
final StateRestorer restorer = stateRestorers.get(partition);
final long pos = processNext(records.records(partition),
restorer, endOffsets.get(partition));
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
restorer.restoreDone();
endOffsets.remove(partition);
- iterator.remove();
+ completedRestorers.add(partition);
}
}
} catch (final InvalidOffsetException recoverableException) {
@@ -98,12 +103,18 @@ public class StoreChangelogReader implements
ChangelogReader {
final Set<TopicPartition> partitions =
recoverableException.partitions();
for (final TopicPartition partition : partitions) {
final StreamTask task = active.restoringTaskFor(partition);
- log.info("Reinitializing StreamTask {}", task);
+ log.info("Reinitializing StreamTask {} for changelog {}",
task, partition);
+
+ needsInitializing.remove(partition);
+ needsRestoring.remove(partition);
+
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);
}
+ needsRestoring.removeAll(completedRestorers);
+
if (needsRestoring.isEmpty()) {
restoreConsumer.unsubscribe();
}
@@ -120,25 +131,24 @@ public class StoreChangelogReader implements
ChangelogReader {
// the needsInitializing map is not empty, meaning we do not know the
metadata for some of them yet
refreshChangelogInfo();
- final Map<TopicPartition, StateRestorer> initializable = new
HashMap<>();
- for (final Map.Entry<TopicPartition, StateRestorer> entry :
needsInitializing.entrySet()) {
- final TopicPartition topicPartition = entry.getKey();
+ final Set<TopicPartition> initializable = new HashSet<>();
+ for (final TopicPartition topicPartition : needsInitializing) {
if (hasPartition(topicPartition)) {
- initializable.put(entry.getKey(), entry.getValue());
+ initializable.add(topicPartition);
}
}
// try to fetch end offsets for the initializable restorers and remove
any partitions
// where we already have all of the data
try {
-
endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
+ endOffsets.putAll(restoreConsumer.endOffsets(initializable));
} catch (final TimeoutException e) {
// if timeout exception gets thrown we just give up this time and
retry in the next run loop
log.debug("Could not fetch end offset for {}; will fall back to
partition by partition fetching", initializable);
return;
}
- final Iterator<TopicPartition> iter =
initializable.keySet().iterator();
+ final Iterator<TopicPartition> iter = initializable.iterator();
while (iter.hasNext()) {
final TopicPartition topicPartition = iter.next();
final Long endOffset = endOffsets.get(topicPartition);
@@ -146,13 +156,15 @@ public class StoreChangelogReader implements
ChangelogReader {
// offset should not be null; but since the consumer API does not
guarantee it
// we add this check just in case
if (endOffset != null) {
- final StateRestorer restorer =
needsInitializing.get(topicPartition);
+ final StateRestorer restorer =
stateRestorers.get(topicPartition);
if (restorer.checkpoint() >= endOffset) {
restorer.setRestoredOffset(restorer.checkpoint());
iter.remove();
+ completedRestorers.add(topicPartition);
} else if (restorer.offsetLimit() == 0 || endOffset == 0) {
restorer.setRestoredOffset(0);
iter.remove();
+ completedRestorers.add(topicPartition);
} else {
restorer.setEndingOffset(endOffset);
}
@@ -169,51 +181,59 @@ public class StoreChangelogReader implements
ChangelogReader {
}
}
- private void startRestoration(final Map<TopicPartition, StateRestorer>
initialized,
+ private void startRestoration(final Set<TopicPartition> initialized,
final RestoringTasks active) {
- log.debug("Start restoring state stores from changelog topics {}",
initialized.keySet());
+ log.debug("Start restoring state stores from changelog topics {}",
initialized);
final Set<TopicPartition> assignment = new
HashSet<>(restoreConsumer.assignment());
- assignment.addAll(initialized.keySet());
+ assignment.addAll(initialized);
restoreConsumer.assign(assignment);
final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
- for (final StateRestorer restorer : initialized.values()) {
+
+ for (final TopicPartition partition : initialized) {
+ final StateRestorer restorer = stateRestorers.get(partition);
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
- restoreConsumer.seek(restorer.partition(),
restorer.checkpoint());
- logRestoreOffsets(restorer.partition(),
- restorer.checkpoint(),
- endOffsets.get(restorer.partition()));
-
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
+ restoreConsumer.seek(partition, restorer.checkpoint());
+ logRestoreOffsets(partition,
+ restorer.checkpoint(),
+ endOffsets.get(partition));
+
restorer.setStartingOffset(restoreConsumer.position(partition));
restorer.restoreStarted();
} else {
- final StreamTask task =
active.restoringTaskFor(restorer.partition());
-
- // If checkpoint does not exist it means the task was not
shutdown gracefully before;
- // and in this case if EOS is turned on we should wipe out the
state and re-initialize the task
- if (task.isEosEnabled()) {
- log.info("No checkpoint found for task {} state store {}
changelog {} with EOS turned on. " +
- "Reinitializing the task and restore its state
from the beginning.", task.id, restorer.storeName(), restorer.partition());
-
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
- } else {
- log.info("Restoring task {}'s state store {} from
beginning of the changelog {} ", task.id, restorer.storeName(),
restorer.partition());
- }
-
-
restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+
restoreConsumer.seekToBeginning(Collections.singletonList(partition));
needsPositionUpdate.add(restorer);
}
}
for (final StateRestorer restorer : needsPositionUpdate) {
- final long position =
restoreConsumer.position(restorer.partition());
- logRestoreOffsets(restorer.partition(),
- position,
- endOffsets.get(restorer.partition()));
- restorer.setStartingOffset(position);
- restorer.restoreStarted();
+ final TopicPartition partition = restorer.partition();
+
+ // If checkpoint does not exist it means the task was not shutdown
gracefully before;
+ // and in this case if EOS is turned on we should wipe out the
state and re-initialize the task
+ final StreamTask task = active.restoringTaskFor(partition);
+ if (task.isEosEnabled()) {
+ log.info("No checkpoint found for task {} state store {}
changelog {} with EOS turned on. " +
+ "Reinitializing the task and restore its state from
the beginning.", task.id, restorer.storeName(), partition);
+
+ needsInitializing.remove(partition);
+ initialized.remove(partition);
+
restorer.setCheckpointOffset(restoreConsumer.position(partition));
+
+
task.reinitializeStateStoresForPartitions(Collections.singleton(partition));
+ } else {
+ log.info("Restoring task {}'s state store {} from beginning of
the changelog {} ", task.id, restorer.storeName(), partition);
+
+ final long position =
restoreConsumer.position(restorer.partition());
+ logRestoreOffsets(restorer.partition(),
+ position,
+ endOffsets.get(restorer.partition()));
+ restorer.setStartingOffset(position);
+ restorer.restoreStarted();
+ }
}
- needsRestoring.putAll(initialized);
+ needsRestoring.addAll(initialized);
}
private void logRestoreOffsets(final TopicPartition partition,
@@ -226,10 +246,7 @@ public class StoreChangelogReader implements
ChangelogReader {
}
private Collection<TopicPartition> completed() {
- final Set<TopicPartition> completed = new
HashSet<>(stateRestorers.keySet());
- completed.removeAll(needsRestoring.keySet());
- log.trace("The set of restoration completed partitions so far: {}",
completed);
- return completed;
+ return completedRestorers;
}
private void refreshChangelogInfo() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 31de839..428aa1d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1129,7 +1129,7 @@ public class StreamThread extends Thread {
throw new TaskMigratedException(task);
}
- log.info("Reinitializing StandbyTask {}", task);
+ log.info("Reinitializing StandbyTask {} from changelogs
{}", task, recoverableException.partitions());
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 1e74d47..ae48f57 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -100,7 +100,11 @@ public class StoreChangelogReaderTest {
public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
mockRestorer.setUserRestoreListener(stateRestoreListener);
- expect(mockRestorer.partition()).andReturn(new
TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0));
+ expect(mockRestorer.partition())
+ .andReturn(new TopicPartition("sometopic", 0))
+ .andReturn(new TopicPartition("sometopic", 0))
+ .andReturn(new TopicPartition("sometopic", 0))
+ .andReturn(new TopicPartition("sometopic", 0));
EasyMock.replay(mockRestorer);
changelogReader.register(mockRestorer);
@@ -144,6 +148,9 @@ public class StoreChangelogReaderTest {
// first restore call "fails" but we should not die with an exception
assertEquals(0, changelogReader.restore(active).size());
+
+ changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true,
+ "storeName"));
// retry restore should succeed
assertEquals(1, changelogReader.restore(active).size());
assertThat(callback.restored.size(), equalTo(messages));
@@ -226,9 +233,9 @@ public class StoreChangelogReaderTest {
setupConsumer(3, two);
changelogReader
- .register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName1"));
- changelogReader.register(new StateRestorer(one, restoreListener1,
null, Long.MAX_VALUE, true, "storeName2"));
- changelogReader.register(new StateRestorer(two, restoreListener2,
null, Long.MAX_VALUE, true, "storeName3"));
+ .register(new StateRestorer(topicPartition, restoreListener, 0L,
Long.MAX_VALUE, true, "storeName1"));
+ changelogReader.register(new StateRestorer(one, restoreListener1, 0L,
Long.MAX_VALUE, true, "storeName2"));
+ changelogReader.register(new StateRestorer(two, restoreListener2, 0L,
Long.MAX_VALUE, true, "storeName3"));
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
@@ -257,7 +264,7 @@ public class StoreChangelogReaderTest {
public void shouldOnlyReportTheLastRestoredOffset() {
setupConsumer(10, topicPartition);
changelogReader
- .register(new StateRestorer(topicPartition, restoreListener, null,
5, true, "storeName1"));
+ .register(new StateRestorer(topicPartition, restoreListener, 0L,
5, true, "storeName1"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);