This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 263a510 KAFKA-6367: StateRestoreListener use actual last restored
offset for restored batch (#4507)
263a510 is described below
commit 263a510676ee0173cbaa11b5de1de4c19d2f5202
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Feb 7 14:07:32 2018 -0500
KAFKA-6367: StateRestoreListener use actual last restored offset for
restored batch (#4507)
Author: Bill Bejeck <[email protected]>
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/StateRestoreListener.java | 4 ++--
.../processor/internals/StoreChangelogReader.java | 5 +++--
.../internals/StoreChangelogReaderTest.java | 24 +++++++++++++++++-----
3 files changed, 24 insertions(+), 9 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index c80a736..ea1c288 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -43,7 +43,7 @@ public interface StateRestoreListener {
* @param topicPartition the TopicPartition containing the values to
restore
* @param storeName the name of the store undergoing restoration
* @param startingOffset the starting offset of the entire restoration
process for this TopicPartition
- * @param endingOffset the ending offset of the entire restoration
process for this TopicPartition
+ * @param endingOffset the exclusive ending offset of the entire
restoration process for this TopicPartition
*/
void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
@@ -62,7 +62,7 @@ public interface StateRestoreListener {
*
* @param topicPartition the TopicPartition containing the values to
restore
* @param storeName the name of the store undergoing restoration
- * @param batchEndOffset the ending offset for the current restored batch
for this TopicPartition
+ * @param batchEndOffset the inclusive ending offset for the current
restored batch for this TopicPartition
* @param numRestored the total number of records restored in this batch
for this TopicPartition
*/
void onBatchRestored(final TopicPartition topicPartition,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index ba17ce9..b11c45b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -273,12 +273,14 @@ public class StoreChangelogReader implements
ChangelogReader {
long nextPosition = -1;
int numberRecords = records.size();
int numberRestored = 0;
+ long lastRestoredOffset = -1;
for (final ConsumerRecord<byte[], byte[]> record : records) {
final long offset = record.offset();
if (restorer.hasCompleted(offset, endOffset)) {
nextPosition = record.offset();
break;
}
+ lastRestoredOffset = offset;
numberRestored++;
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(),
record.value()));
@@ -295,8 +297,7 @@ public class StoreChangelogReader implements
ChangelogReader {
if (!restoreRecords.isEmpty()) {
restorer.restore(restoreRecords);
- restorer.restoreBatchCompleted(nextPosition, records.size());
-
+ restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
}
return nextPosition;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ee96451..e69cede 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -234,15 +234,28 @@ public class StoreChangelogReaderTest {
assertThat(callbackTwo.restored.size(), equalTo(3));
assertAllCallbackStatesExecuted(callback, "storeName1");
- assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L);
+ assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L);
assertAllCallbackStatesExecuted(callbackOne, "storeName2");
- assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L);
+ assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L);
assertAllCallbackStatesExecuted(callbackTwo, "storeName3");
- assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 3L, 3L);
+ assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 2L, 3L);
}
+ @Test
+ public void shouldOnlyReportTheLastRestoredOffset() {
+ setupConsumer(10, topicPartition);
+ changelogReader
+ .register(new StateRestorer(topicPartition, restoreListener, null,
5, true, "storeName1"));
+ changelogReader.restore(active);
+
+ assertThat(callback.restored.size(), equalTo(5));
+ assertAllCallbackStatesExecuted(callback, "storeName1");
+ assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L);
+ }
+
+
private void assertAllCallbackStatesExecuted(final
MockStateRestoreListener restoreListener,
final String storeName) {
assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START),
equalTo(storeName));
@@ -253,11 +266,12 @@ public class StoreChangelogReaderTest {
private void assertCorrectOffsetsReportedByListener(final
MockStateRestoreListener restoreListener,
final long startOffset,
- final long
batchOffset, final long endOffset) {
+ final long batchOffset,
+ final long
totalRestored) {
assertThat(restoreListener.restoreStartOffset, equalTo(startOffset));
assertThat(restoreListener.restoredBatchOffset, equalTo(batchOffset));
- assertThat(restoreListener.restoreEndOffset, equalTo(endOffset));
+ assertThat(restoreListener.totalNumRestored, equalTo(totalRestored));
}
@Test
--
To stop receiving notification emails like this one, please contact
[email protected].