C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1149500829


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,37 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * <ul>
+ *     <li>syncs[0] is the latest offset sync from the syncs topic</li>
+ *     <li>For each i,j, i <= j: syncs[j].upstream <= syncs[i].upstream < 
syncs[j].upstream + 2^j</li>
+ * </ul>
+ * <p>Offset translation uses the syncs[i] which most closely precedes the 
upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will 
be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will 
not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an 
initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are 
present for a topic-partition, or if relevant
+ * offset syncs for the topic were eligible for compaction at the time of the 
initial read-to-end.

Review Comment:
   I'm unclear on what exactly this part refers to--does it cover the case 
where syncs for earlier offsets have been wiped due to compaction? If so, we 
may want to clarify that offset syncs were "lost due to compaction" instead of 
"eligible for compaction". And it's also possible that replication began from a 
later point than the consumer group is reading from, which is also a possible 
reason that translation may be unavailable. Oh, and I guess there's the case 
where replication began, but was unable to produce offset syncs for whatever 
reason.
   
   If I'm on the right track here, maybe it'd be simplest to just say "or if no 
relevant offset syncs for the topic were available at the time of the initial 
read-to-end"?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        syncs[0] = offsetSync;
+        for (int i = 1; i < Long.SIZE; i++) {
+            OffsetSync oldValue = syncs[i];
+            long mask = Long.MAX_VALUE << i;
+            // If the old value is too stale: at least one of the 64-i high 
bits of the offset value have changed
+            // This produces buckets quantized at boundaries of powers of 2
+            // Syncs:     a                  b             c       d   e
+            // Bucket 0  |a|                |b|           |c|     |d| |e|      
             (size    1)
+            // Bucket 1 | a|               | b|           |c |    |d | e|      
             (size    2)
+            // Bucket 2 | a  |           |   b|         |  c |  |  d |   |     
             (size    4)
+            // Bucket 3 | a      |       |   b   |      |  c    |  d     |     
             (size    8)
+            // Bucket 4 | a              |   b          |  c             |     
             (size   16)
+            // Bucket 5 | a                             |  c                   
          |  (size   32)
+            // Bucket 6 | a                                                    
          |  (size   64)
+            // ...        a                                                    
             ...
+            // Bucket63   a                                                    
             (size 2^63)
+            // State after a: [a,,,,,,, ... ,a] (all buckets written)
+            // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+            // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b 
expired completely)
+            // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+            // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)
+            if (((oldValue.upstreamOffset() ^ upstreamOffset) & mask) != 0) {
+                syncs[i] = offsetSync;
+            } else {
+                break;
+            }
+        }
     }
 
-    private Optional<OffsetSync> latestOffsetSync(TopicPartition 
topicPartition) {
-        return Optional.ofNullable(offsetSyncs.get(topicPartition));
+    private Optional<OffsetSync> latestOffsetSync(TopicPartition 
topicPartition, long offset) {
+        return Optional.ofNullable(offsetSyncs.get(topicPartition))
+                .map(syncs -> lookupLatestSync(syncs, offset));
+    }
+
+
+    private OffsetSync lookupLatestSync(OffsetSync[] syncs, long offset) {
+        // optimization: skip loop if every sync in the store is ahead of the 
requested offset

Review Comment:
   Do we think this will be necessary? IIUC this logic isn't really on a hot 
path performance-wise.
   
   We could also do a binary search across the elements of the `syncs` array 
instead of a linear search if we really want to get fancy, but I'm inclined to 
try to keep things simpler for now if we can.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        syncs[0] = offsetSync;
+        for (int i = 1; i < Long.SIZE; i++) {
+            OffsetSync oldValue = syncs[i];
+            long mask = Long.MAX_VALUE << i;
+            // If the old value is too stale: at least one of the 64-i high 
bits of the offset value have changed
+            // This produces buckets quantized at boundaries of powers of 2
+            // Syncs:     a                  b             c       d   e
+            // Bucket 0  |a|                |b|           |c|     |d| |e|      
             (size    1)
+            // Bucket 1 | a|               | b|           |c |    |d | e|      
             (size    2)
+            // Bucket 2 | a  |           |   b|         |  c |  |  d |   |     
             (size    4)
+            // Bucket 3 | a      |       |   b   |      |  c    |  d     |     
             (size    8)
+            // Bucket 4 | a              |   b          |  c             |     
             (size   16)
+            // Bucket 5 | a                             |  c                   
          |  (size   32)
+            // Bucket 6 | a                                                    
          |  (size   64)
+            // ...        a                                                    
             ...
+            // Bucket63   a                                                    
             (size 2^63)
+            // State after a: [a,,,,,,, ... ,a] (all buckets written)
+            // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+            // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b 
expired completely)
+            // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+            // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)
+            if (((oldValue.upstreamOffset() ^ upstreamOffset) & mask) != 0) {
+                syncs[i] = offsetSync;
+            } else {
+                break;
+            }
+        }
     }
 
-    private Optional<OffsetSync> latestOffsetSync(TopicPartition 
topicPartition) {
-        return Optional.ofNullable(offsetSyncs.get(topicPartition));
+    private Optional<OffsetSync> latestOffsetSync(TopicPartition 
topicPartition, long offset) {
+        return Optional.ofNullable(offsetSyncs.get(topicPartition))
+                .map(syncs -> lookupLatestSync(syncs, offset));
+    }
+
+
+    private OffsetSync lookupLatestSync(OffsetSync[] syncs, long offset) {

Review Comment:
   Nit: it could be a little clearer to rename the `offset` parameter to 
`upstreamOffset`



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);

Review Comment:
   `Long.SIZE` is used in several places in this change. Could we pull it out 
into a constant like `SYNCS_PER_PARTITION`?
   
   If there's a special reason we're using `Long.SIZE` for that value, we 
should probably also add a comment on why we've chosen it.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));

Review Comment:
   This can be a single statement:
   ```suggestion
           offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
                   syncs == null
                           ? createInitialSyncs(offsetSync)
                           : updateExistingSyncs(syncs, offsetSync)
           );
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);

Review Comment:
   Could we pull this out into a separate method? Maybe something like 
`logUpdatedSyncs`.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   Doesn't the `!readToEnd` part of this condition cause us to drop older 
offset syncs? Don't we want to hold onto those in order to be able to translate 
older upstream offsets on a best-effort basis (as long as there are relevant 
syncs available that haven't been compacted yet)?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.

Review Comment:
   If we know with certainty that this change will yield significant 
improvement, we should file a follow-up ticket for it.
   
   On the other hand, if this is more of a "we may want to look into it" sort 
of TODO, then we should tweak the language to something like "consider batching 
updates" to make it clear that this change might be useful, but we don't know 
for sure without more information.
   
   (I'm reluctant to file follow-up tickets for things that may or may not be 
necessary since people may self-assign those tickets without understanding what 
information we need before deciding whether to implement the change or not.)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        syncs[0] = offsetSync;
+        for (int i = 1; i < Long.SIZE; i++) {
+            OffsetSync oldValue = syncs[i];
+            long mask = Long.MAX_VALUE << i;
+            // If the old value is too stale: at least one of the 64-i high 
bits of the offset value have changed
+            // This produces buckets quantized at boundaries of powers of 2
+            // Syncs:     a                  b             c       d   e
+            // Bucket 0  |a|                |b|           |c|     |d| |e|      
             (size    1)
+            // Bucket 1 | a|               | b|           |c |    |d | e|      
             (size    2)
+            // Bucket 2 | a  |           |   b|         |  c |  |  d |   |     
             (size    4)
+            // Bucket 3 | a      |       |   b   |      |  c    |  d     |     
             (size    8)
+            // Bucket 4 | a              |   b          |  c             |     
             (size   16)
+            // Bucket 5 | a                             |  c                   
          |  (size   32)
+            // Bucket 6 | a                                                    
          |  (size   64)
+            // ...        a                                                    
             ...
+            // Bucket63   a                                                    
             (size 2^63)
+            // State after a: [a,,,,,,, ... ,a] (all buckets written)
+            // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+            // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b 
expired completely)
+            // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+            // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)

Review Comment:
   I've been trying to read this part for the last 20 minutes and still can't 
really understand the concept it's trying to relay.
   
   Could it help to refer back to the invariants mentioned in the Javadoc for 
this class and how the logic here implements those correctly?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -178,13 +182,44 @@ private List<SourceRecord> sourceRecordsForGroup(String 
group) throws Interrupte
         }
     }
 
-    private List<Checkpoint> checkpointsForGroup(String group) throws 
ExecutionException, InterruptedException {
-        return listConsumerGroupOffsets(group).entrySet().stream()
+    // for testing
+    protected Map<TopicPartition, Checkpoint> 
checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> 
upstreamGroupOffsets, String group) throws ExecutionException, 
InterruptedException {

Review Comment:
   Should be package-private, not protected



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +57,96 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
         }
     }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 2000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 1030, 1000);
+            assertSparseSync(store, 1540, 1030);
+            assertSparseSync(store, 1800, 1540);
+            assertSparseSync(store, 1920, 1800);
+            assertSparseSync(store, 1990, 1920);
+            assertSparseSync(store, 2000, 1990);

Review Comment:
   Where do the argument values for these calls come from?
   
   Naively, it looks like this test might have been written by observing the 
behavior of the `OffsetSyncStore` class (however arbitrary or even incorrect it 
may be) and then adding assertions that match the observed behavior, rather 
than identifying what correct values would look like and then ensuring that the 
`OffsetSyncStore` class's behavior gives those values.
   
   Separately but possibly related, would it help to have a method that ensures 
that the invariants for the syncs stored by the `OffsetSyncStore` hold? I 
drafted something like this and it seems to work nicely:
   
   In `OffsetSyncStoreTest`:
   ```java
   private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
       for (int j = 0; j < Long.SIZE; j++) {
           for (int i = 0; i <= j; i++) {
               long jUpstream = store.syncFor(topicPartition, 
j).upstreamOffset();
               long iUpstream = store.syncFor(topicPartition, 
i).upstreamOffset();
               assertTrue(
                       jUpstream <= iUpstream,
                       "Upstream offset " + iUpstream + " at position " + i
                               + " should be at least as recent as offset " + 
jUpstream + " at position " + j
               );
               long iUpstreamBound = jUpstream + (1L << j);
               if (iUpstreamBound < 0)
                   continue;
               assertTrue(
                       iUpstream < iUpstreamBound,
                       "Upstream offset " + iUpstream + " at position " + i
                       + " should be no greater than " + iUpstreamBound + " (" 
+ jUpstream + " + 2^" + j + ")"
               );
           }
       }
   }
   ```
   
   (this method can be invoked after ever call to `OffsetSyncStore::sync`)
   
   In `OffsetSyncStore`:
   ```java
   // For testing
   OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
       OffsetSync[] syncs = offsetSyncs.get(topicPartition);
       if (syncs == null)
           throw new IllegalArgumentException("No syncs present for " + 
topicPartition);
       if (syncIdx >= syncs.length)
           throw new IllegalArgumentException(
                   "Requested sync " + (syncIdx + 1) + " for " + topicPartition
                   + " but there are only " + syncs.length + " syncs available 
for that topic partition"
           );
       return syncs[syncIdx];
   }
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,

Review Comment:
   I think this might confuse some users, since you'd basically have to look at 
this exact line in the source code to know what a series of consecutive commas 
represents.
   
   IMO we should either skip identical consecutive elements and the commas 
before them, or include both them and their commas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to