C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1174698662


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,46 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * <ul>
+ *     <li>Invariant A: syncs[0] is the latest offset sync from the syncs 
topic</li>
+ *     <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[j].upstream + 2^(i-2) <= syncs[i].upstream</li>

Review Comment:
   I found this easier to grok when invariants B and C both established clear 
bounds on the value of `syncs[i]`:
   
   ```suggestion
    *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[i].upstream >= syncs[j].upstream + 2^(i-2)</li>
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,46 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * <ul>
+ *     <li>Invariant A: syncs[0] is the latest offset sync from the syncs 
topic</li>
+ *     <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[j].upstream + 2^(i-2) <= syncs[i].upstream</li>
+ *     <li>Invariant D: syncs[63] is the earliest offset sync from the syncs 
topic which was not eligible for compaction</li>
+ * </ul>
+ * <p>The above invariants ensure that the store is kept updated upon receipt 
of each sync, and that distinct
+ * offset syncs are separated by approximately exponential space. They can be 
checked locally (by comparing all adjacent
+ * indexes) but hold globally (for all pairs of any distance). This allows 
updates to the store in linear time.
+ * <p>Offset translation uses the syncs[i] which most closely precedes the 
upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will 
be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will 
not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an 
initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are 
present for a topic-partition, if replication
+ * started after the position of the consumer group, or if relevant offset 
syncs for the topic were eligible for
+ * compaction at the time of the initial read-to-end.

Review Comment:
   I'm still not a huge fan of the "eligible for compaction" phrase here and 
above. It does seem to cover the exact scenarios it intends to, but it's a bit 
misleading. The fears we have about reusing offset syncs for which a new sync 
on the same topic partition is available don't have to do with compaction, they 
have to do with compression of in-memory state and that compression not being 
applied identically across restarts.
   
   I'd like it if we could a) use language that doesn't imply that compaction 
is the issue and b) add the rationale for not reusing older offset syncs in a 
comment somewhere (possibly where we do the `!readToEnd` check?).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant 
for this index
+            if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+                // Invariant B holds for syncs[current]: it must also hold for 
all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too 
old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old 
value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, 
current);
+                assert invariantC(syncs[previous], syncs[current], previous, 
current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);

Review Comment:
   Is my understanding correct that the right-hand expression here is at risk 
of overflow and, to account for that, we include the `bound < 0` condition in 
the return statement below?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs

Review Comment:
   ```suggestion
               // Rewinding upstream offsets should clear all historical syncs
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {

Review Comment:
   IMO vanilla multiplication is more readable here:
   ```suggestion
           for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache

Review Comment:
   ```suggestion
           // Each new sync should expire at most two other syncs from the 
cache (in addition to always adding a single sync to the cache)
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored

Review Comment:
   I read this as a TODO at first; the "Consider ..." language threw me. Do you 
think this might be clearer?
   ```suggestion
               // We can potentially use oldValue instead of replacement, which 
allows us to keep more distinct values stored
               // We check invariants B and C with oldValue to see if it can be 
used
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant 
for this index
+            if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+                // Invariant B holds for syncs[current]: it must also hold for 
all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too 
old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old 
value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, 
current);
+                assert invariantC(syncs[previous], syncs[current], previous, 
current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {

Review Comment:
   We don't use `j` in this invariant:
   ```suggestion
       private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];

Review Comment:
   This is completely optional; feel free to include/not include/modify at your 
discretion
   ```suggestion
           // The most-recently-discarded offset sync
           // We track this since it may still be eligible for use in the syncs 
array at a later index
           OffsetSync oldValue = syncs[0];
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant 
for this index
+            if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+                // Invariant B holds for syncs[current]: it must also hold for 
all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too 
old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old 
value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, 
current);
+                assert invariantC(syncs[previous], syncs[current], previous, 
current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {
+        long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));

Review Comment:
   Same question RE overflow susceptibility and handling



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after 
receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, 
long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : 
previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), 
store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != 
store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, 
j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, 
i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 
0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,

Review Comment:
   If we rewrite invariant C to establish a lower bound for `syncs[i]`, we 
should probably also rewrite this part to reflect that:
   ```suggestion
                          iUpstream >= iUpstreamLowerBound,
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after 
receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, 
long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : 
previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), 
store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != 
store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {

Review Comment:
   Invariants B and C deal exclusively with values of `i` and `j`; shouldn't 
this be reflected here (especially since the `jUpstream == iUpstream` branch 
below makes it unnecessary to test the case when `i == j`)?
   ```suggestion
               for (int i = 0; i < j; i++) {
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after 
receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, 
long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : 
previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), 
store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != 
store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, 
j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, 
i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 
0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + 
iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound 
+ " (" + jUpstream + " + 2^" + Math.max(i - 1, 0) + ")"
+                );
+                long iUpstreamBound = jUpstream + (1L << j) - (1L << i);

Review Comment:
   Nit: `iUpstreamUpperBound` is clearer, especially since we already have 
`iUpstreamLowerBound` declared in the same scope.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after 
receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, 
long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : 
previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), 
store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != 
store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, 
j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, 
i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 
0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + 
iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound 
+ " (" + jUpstream + " + 2^" + Math.max(i - 1, 0) + ")"
+                );
+                long iUpstreamBound = jUpstream + (1L << j) - (1L << i);
+                if (iUpstreamBound < 0)
+                    continue;
+                assertTrue(
+                        iUpstream <= iUpstreamBound,
+                        "Invariant B(" + i + "," + j + "): Upstream offset " + 
iUpstream + " at position " + i
+                                + " should be no greater than " + 
iUpstreamBound + " (" + jUpstream + " + 2^" + j + ")"

Review Comment:
   This doesn't match the invariant (or the right-hand expression for the 
declaration of `iUpstreamBound`):
   ```suggestion
                                   + " should be no greater than " + 
iUpstreamBound + " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant 
for this index
+            if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+                // Invariant B holds for syncs[current]: it must also hold for 
all later values

Review Comment:
   Why? (Feel free to explain via GH discussion thread instead of code comments 
if a succinct explanation is difficult.)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.

Review Comment:
   ```suggestion
               // The replacement variable always contains a value which 
satisfies the invariants for this index.
               // Note that this does not mean that we have to use the 
replacement (since the invariants could already be satisfied without 
overwriting the current sync)
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache

Review Comment:
   Also, it'd be nice to understand why this holds given our invariants. If we 
can fit that into a comment, great; if not, a GitHub discussion thread is 
better than nothing.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after 
receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, 
long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : 
previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), 
store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != 
store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, 
j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, 
i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 
0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + 
iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound 
+ " (" + jUpstream + " + 2^" + Math.max(i - 1, 0) + ")"

Review Comment:
   This doesn't match the invariant, should we change it?
   ```suggestion
                                   + " should be at least " + 
iUpstreamLowerBound + " (" + jUpstream + " + 2^" + Math.max(i - 2, 0) + ")"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to