[ 
https://issues.apache.org/jira/browse/KAFKA-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364752#comment-16364752
 ] 

ASF GitHub Bot commented on KAFKA-6364:
---------------------------------------

guozhangwang closed pull request #4511: KAFKA-6364: second check for ensuring 
changelog topic not changed during restore
URL: https://github.com/apache/kafka/pull/4511
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c4929959211..ceb7024b97b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -51,7 +51,7 @@
     private final Map<String, List<PartitionInfo>> partitions;
     private final SubscriptionState subscriptions;
     private final Map<TopicPartition, Long> beginningOffsets;
-    private final Map<TopicPartition, Long> endOffsets;
+    private final Map<TopicPartition, List<Long>> endOffsets;
     private final Map<TopicPartition, OffsetAndMetadata> committed;
     private final Queue<Runnable> pollTasks;
     private final Set<TopicPartition> paused;
@@ -290,8 +290,26 @@ public synchronized void 
seekToEnd(Collection<TopicPartition> partitions) {
             subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
     }
 
-    public synchronized void updateEndOffsets(Map<TopicPartition, Long> 
newOffsets) {
-        endOffsets.putAll(newOffsets);
+    // needed for cases where you make a second call to endOffsets
+    public synchronized void addEndOffsets(final Map<TopicPartition, Long> 
newOffsets) {
+        innerUpdateEndOffsets(newOffsets, false);
+    }
+
+    public synchronized void updateEndOffsets(final Map<TopicPartition, Long> 
newOffsets) {
+        innerUpdateEndOffsets(newOffsets, true);
+    }
+
+    private void innerUpdateEndOffsets(final Map<TopicPartition, Long> 
newOffsets,
+                                       final boolean replace) {
+
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsets.entrySet()) {
+            List<Long> offsets = endOffsets.get(entry.getKey());
+            if (replace || offsets == null) {
+                offsets = new ArrayList<>();
+            }
+            offsets.add(entry.getValue());
+            endOffsets.put(entry.getKey(), offsets);
+        }
     }
 
     @Override
@@ -354,7 +372,7 @@ public synchronized void resume(Collection<TopicPartition> 
partitions) {
     public synchronized Map<TopicPartition, Long> 
endOffsets(Collection<TopicPartition> partitions) {
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
-            Long endOffset = endOffsets.get(tp);
+            Long endOffset = getEndOffset(endOffsets.get(tp));
             if (endOffset == null)
                 throw new IllegalStateException("The partition " + tp + " does 
not have an end offset.");
             result.put(tp, endOffset);
@@ -430,7 +448,7 @@ private void resetOffsetPosition(TopicPartition tp) {
             if (offset == null)
                 throw new IllegalStateException("MockConsumer didn't have 
beginning offset specified, but tried to seek to beginning");
         } else if (strategy == OffsetResetStrategy.LATEST) {
-            offset = endOffsets.get(tp);
+            offset = getEndOffset(endOffsets.get(tp));
             if (offset == null)
                 throw new IllegalStateException("MockConsumer didn't have end 
offset specified, but tried to seek to end");
         } else {
@@ -438,4 +456,11 @@ private void resetOffsetPosition(TopicPartition tp) {
         }
         seek(tp, offset);
     }
+
+    private Long getEndOffset(List<Long> offsets) {
+        if (offsets == null || offsets.isEmpty()) {
+            return null;
+        }
+        return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b11c45ba313..5fcba76570e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -255,6 +255,15 @@ private void restorePartition(final 
ConsumerRecords<byte[], byte[]> allRecords,
                 throw new TaskMigratedException(task, topicPartition, 
endOffset, pos);
             }
 
+            // need to check for changelog topic
+            if (restorer.offsetLimit() == Long.MAX_VALUE) {
+                final Long updatedEndOffset = 
restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
+                if (!restorer.hasCompleted(pos, updatedEndOffset)) {
+                    throw new TaskMigratedException(task, topicPartition, 
updatedEndOffset, pos);
+                }
+            }
+
+
             log.debug("Completed restoring state from changelog {} with {} 
records ranging from offset {} to {}",
                       topicPartition,
                       restorer.restoredNumRecords(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e69cede23fd..c65d4efadb1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -390,9 +390,50 @@ public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
         try {
             changelogReader.restore(active);
             fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
+        } catch (final TaskMigratedException expected) {
+            /* ignore */
+        }
     }
 
+
+    @Test
+    public void 
shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck()
 {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        // in this case first call to endOffsets returns correct value, but a 
second thread has updated the changelog topic
+        // so a subsequent call to endOffsets returns a value exceeding the 
expected end value
+        consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        try {
+            changelogReader.restore(active);
+            fail("Should have thrown TaskMigratedException");
+        } catch (final TaskMigratedException expected) {
+            // verifies second block threw exception with updated end offset
+            assertTrue(expected.getMessage().contains("end offset 15, current 
offset 10"));
+        }
+    }
+
+
+    @Test
+    public void 
shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        // in this case first call to endOffsets returns correct value, but a 
second thread has updated the source topic
+        // but since it's a source topic, the second check should not fire 
hence no exception
+        consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, 9L, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+    }
+
+
     @Test
     public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled()
 {
         final int totalMessages = 10;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Second Check for End Offset During Restore
> ----------------------------------------------
>
>                 Key: KAFKA-6364
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6364
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 1.0.2
>
>
> We need to re-check the ending offset when restoring a changelog topic to 
> guard against the race condition of an additional record appended to log 
> immediately on restoring start.  Also, need to add a check for KTable source 
> topic and if offset limit is set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to