C0urante commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1151130958
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -57,52 +58,120 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); - assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); - assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); - assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); + assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + } + } + + @Test + public void testPastOffsetTranslation() { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + long maxOffsetLag = 10; + int offset = 0; + for (; offset <= 1000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + assertSparseSyncInvariant(store, tp); + } + store.start(); + + // After starting but before seeing new offsets, only the latest startup offset can be translated + assertSparseSync(store, 1000, -1); + + for (; offset <= 2000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + assertSparseSyncInvariant(store, tp); + } + + // After seeing new offsets, we still cannot translate earlier than the latest startup offset + assertSparseSync(store, 1000, -1); + + // We can translate offsets between the latest startup offset and the latest offset with variable precision + // Older offsets are less precise and translation ends up farther apart + assertSparseSync(store, 1030, 1000); + assertSparseSync(store, 1540, 1030); + assertSparseSync(store, 1800, 1540); + assertSparseSync(store, 1920, 1800); + assertSparseSync(store, 1990, 1920); + assertSparseSync(store, 2000, 1990); + + // Rewind upstream offsets should clear all historical syncs + store.sync(tp, 1500, 2500); Review Comment: Worth adding a call to `assertSparseSyncInvariant` after this sync as well? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java: ########## @@ -16,12 +16,12 @@ */ package org.apache.kafka.connect.mirror; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Collections; import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.ExecutionException; Review Comment: Unused import (causes Checkstyle to fail) ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: Hmmm... it feels like we're sacrificing a fair amount here in the name of correctness. The first naive thought that comes to mind is that we could list the offsets for each to-be-synced group on the target cluster before doing the first sync for it, and refuse to translate backwards. Of course, this doesn't handle cases where 1) the upstream offsets went backwards for some reason, or 2) when the downstream consumer group has been active and committed later offsets between now and when the last `MirrorCheckpointTask` instance was running. I don't know if 2) should really be a concern or not (if you've committed offsets in that group, it's debatable whether we should be overwriting those). But I think 1) is worth taking into account, and I'm not sure how we can address it. Re-reading the offsets sync topic on startup and checking to see a decrease in upstream offsets between two consecutive syncs doesn't take compaction into account. I'd love it if we could leverage some of this information that's in the offsets sync topic on startup so that we can narrow the likelihood of inactive or slow-moving upstream consumer groups failing to be synced, but I agree that if the scenario you outlined is possible (and without having thought too hard about it, it does seem that way), we can't sacrifice correctness by ignoring it. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + clearSyncArray(syncs, offsetSync); + return; + } + syncs[0] = offsetSync; + for (int i = 1; i < Long.SIZE; i++) { + OffsetSync oldValue = syncs[i]; + long mask = Long.MAX_VALUE << i; + // If the old value is too stale: at least one of the 64-i high bits of the offset value have changed + // This produces buckets quantized at boundaries of powers of 2 + // Syncs: a b c d e + // Bucket 0 |a| |b| |c| |d| |e| (size 1) + // Bucket 1 | a| | b| |c | |d | e| (size 2) + // Bucket 2 | a | | b| | c | | d | | (size 4) + // Bucket 3 | a | | b | | c | d | (size 8) + // Bucket 4 | a | b | c | (size 16) + // Bucket 5 | a | c | (size 32) + // Bucket 6 | a | (size 64) + // ... a ... + // Bucket63 a (size 2^63) + // State after a: [a,,,,,,, ... ,a] (all buckets written) + // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written) + // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b expired completely) + // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written) + // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written) Review Comment: The new comments are succinct and helpful, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org