C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1151130958


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,120 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 2000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 1030, 1000);
+            assertSparseSync(store, 1540, 1030);
+            assertSparseSync(store, 1800, 1540);
+            assertSparseSync(store, 1920, 1800);
+            assertSparseSync(store, 1990, 1920);
+            assertSparseSync(store, 2000, 1990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 2500);

Review Comment:
   Worth adding a call to `assertSparseSyncInvariant` after this sync as well?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java:
##########
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;

Review Comment:
   Unused import (causes Checkstyle to fail)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   Hmmm... it feels like we're sacrificing a fair amount here in the name of 
correctness.
   
   The first naive thought that comes to mind is that we could list the offsets 
for each to-be-synced group on the target cluster before doing the first sync 
for it, and refuse to translate backwards.
   
   Of course, this doesn't handle cases where 1) the upstream offsets went 
backwards for some reason, or 2) when the downstream consumer group has been 
active and committed later offsets between now and when the last 
`MirrorCheckpointTask` instance was running.
   
   I don't know if 2) should really be a concern or not (if you've committed 
offsets in that group, it's debatable whether we should be overwriting those). 
But I think 1) is worth taking into account, and I'm not sure how we can 
address it. Re-reading the offsets sync topic on startup and checking to see a 
decrease in upstream offsets between two consecutive syncs doesn't take 
compaction into account.
   
   I'd love it if we could leverage some of this information that's in the 
offsets sync topic on startup so that we can narrow the likelihood of inactive 
or slow-moving upstream consumer groups failing to be synced, but I agree that 
if the scenario you outlined is possible (and without having thought too hard 
about it, it does seem that way), we can't sacrifice correctness by ignoring it.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        syncs[0] = offsetSync;
+        for (int i = 1; i < Long.SIZE; i++) {
+            OffsetSync oldValue = syncs[i];
+            long mask = Long.MAX_VALUE << i;
+            // If the old value is too stale: at least one of the 64-i high 
bits of the offset value have changed
+            // This produces buckets quantized at boundaries of powers of 2
+            // Syncs:     a                  b             c       d   e
+            // Bucket 0  |a|                |b|           |c|     |d| |e|      
             (size    1)
+            // Bucket 1 | a|               | b|           |c |    |d | e|      
             (size    2)
+            // Bucket 2 | a  |           |   b|         |  c |  |  d |   |     
             (size    4)
+            // Bucket 3 | a      |       |   b   |      |  c    |  d     |     
             (size    8)
+            // Bucket 4 | a              |   b          |  c             |     
             (size   16)
+            // Bucket 5 | a                             |  c                   
          |  (size   32)
+            // Bucket 6 | a                                                    
          |  (size   64)
+            // ...        a                                                    
             ...
+            // Bucket63   a                                                    
             (size 2^63)
+            // State after a: [a,,,,,,, ... ,a] (all buckets written)
+            // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+            // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b 
expired completely)
+            // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+            // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)

Review Comment:
   The new comments are succinct and helpful, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to