C0urante commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1149500829
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -25,17 +25,37 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; -/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ +/** + * Used internally by MirrorMaker. Stores offset syncs and performs offset translation. + * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation + * later in the topic, closer to the live end of the topic. + * This maintains the following invariants for each topic-partition in the in-memory sync storage: + * <ul> + * <li>syncs[0] is the latest offset sync from the syncs topic</li> + * <li>For each i,j, i <= j: syncs[j].upstream <= syncs[i].upstream < syncs[j].upstream + 2^j</li> + * </ul> + * <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset. + * For a fixed in-memory state, translation of variable upstream offsets will be monotonic. + * For variable in-memory state, translation of a fixed upstream offset will not be monotonic. + * <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic + * is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, or if relevant + * offset syncs for the topic were eligible for compaction at the time of the initial read-to-end. Review Comment: I'm unclear on what exactly this part refers to--does it cover the case where syncs for earlier offsets have been wiped due to compaction? If so, we may want to clarify that offset syncs were "lost due to compaction" instead of "eligible for compaction". And it's also possible that replication began from a later point than the consumer group is reading from, which is also a possible reason that translation may be unavailable. Oh, and I guess there's the case where replication began, but was unable to produce offset syncs for whatever reason. If I'm on the right track here, maybe it'd be simplest to just say "or if no relevant offset syncs for the topic were available at the time of the initial read-to-end"? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + clearSyncArray(syncs, offsetSync); + return; + } + syncs[0] = offsetSync; + for (int i = 1; i < Long.SIZE; i++) { + OffsetSync oldValue = syncs[i]; + long mask = Long.MAX_VALUE << i; + // If the old value is too stale: at least one of the 64-i high bits of the offset value have changed + // This produces buckets quantized at boundaries of powers of 2 + // Syncs: a b c d e + // Bucket 0 |a| |b| |c| |d| |e| (size 1) + // Bucket 1 | a| | b| |c | |d | e| (size 2) + // Bucket 2 | a | | b| | c | | d | | (size 4) + // Bucket 3 | a | | b | | c | d | (size 8) + // Bucket 4 | a | b | c | (size 16) + // Bucket 5 | a | c | (size 32) + // Bucket 6 | a | (size 64) + // ... a ... + // Bucket63 a (size 2^63) + // State after a: [a,,,,,,, ... ,a] (all buckets written) + // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written) + // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b expired completely) + // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written) + // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written) + if (((oldValue.upstreamOffset() ^ upstreamOffset) & mask) != 0) { + syncs[i] = offsetSync; + } else { + break; + } + } } - private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition) { - return Optional.ofNullable(offsetSyncs.get(topicPartition)); + private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long offset) { + return Optional.ofNullable(offsetSyncs.get(topicPartition)) + .map(syncs -> lookupLatestSync(syncs, offset)); + } + + + private OffsetSync lookupLatestSync(OffsetSync[] syncs, long offset) { + // optimization: skip loop if every sync in the store is ahead of the requested offset Review Comment: Do we think this will be necessary? IIUC this logic isn't really on a hot path performance-wise. We could also do a binary search across the elements of the `syncs` array instead of a linear search if we really want to get fancy, but I'm inclined to try to keep things simpler for now if we can. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + clearSyncArray(syncs, offsetSync); + return; + } + syncs[0] = offsetSync; + for (int i = 1; i < Long.SIZE; i++) { + OffsetSync oldValue = syncs[i]; + long mask = Long.MAX_VALUE << i; + // If the old value is too stale: at least one of the 64-i high bits of the offset value have changed + // This produces buckets quantized at boundaries of powers of 2 + // Syncs: a b c d e + // Bucket 0 |a| |b| |c| |d| |e| (size 1) + // Bucket 1 | a| | b| |c | |d | e| (size 2) + // Bucket 2 | a | | b| | c | | d | | (size 4) + // Bucket 3 | a | | b | | c | d | (size 8) + // Bucket 4 | a | b | c | (size 16) + // Bucket 5 | a | c | (size 32) + // Bucket 6 | a | (size 64) + // ... a ... + // Bucket63 a (size 2^63) + // State after a: [a,,,,,,, ... ,a] (all buckets written) + // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written) + // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b expired completely) + // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written) + // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written) + if (((oldValue.upstreamOffset() ^ upstreamOffset) & mask) != 0) { + syncs[i] = offsetSync; + } else { + break; + } + } } - private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition) { - return Optional.ofNullable(offsetSyncs.get(topicPartition)); + private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long offset) { + return Optional.ofNullable(offsetSyncs.get(topicPartition)) + .map(syncs -> lookupLatestSync(syncs, offset)); + } + + + private OffsetSync lookupLatestSync(OffsetSync[] syncs, long offset) { Review Comment: Nit: it could be a little clearer to rename the `offset` parameter to `upstreamOffset` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); Review Comment: `Long.SIZE` is used in several places in this change. Could we pull it out into a constant like `SYNCS_PER_PARTITION`? If there's a special reason we're using `Long.SIZE` for that value, we should probably also add a comment on why we've chosen it. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); Review Comment: This can be a single statement: ```suggestion offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync) ); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); Review Comment: Could we pull this out into a separate method? Maybe something like `logUpdatedSyncs`. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: Doesn't the `!readToEnd` part of this condition cause us to drop older offset syncs? Don't we want to hold onto those in order to be able to translate older upstream offsets on a best-effort basis (as long as there are relevant syncs available that haven't been compacted yet)? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. Review Comment: If we know with certainty that this change will yield significant improvement, we should file a follow-up ticket for it. On the other hand, if this is more of a "we may want to look into it" sort of TODO, then we should tweak the language to something like "consider batching updates" to make it clear that this change might be useful, but we don't know for sure without more information. (I'm reluctant to file follow-up tickets for things that may or may not be necessary since people may self-assign those tickets without understanding what information we need before deciding whether to implement the change or not.) ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + clearSyncArray(syncs, offsetSync); + return; + } + syncs[0] = offsetSync; + for (int i = 1; i < Long.SIZE; i++) { + OffsetSync oldValue = syncs[i]; + long mask = Long.MAX_VALUE << i; + // If the old value is too stale: at least one of the 64-i high bits of the offset value have changed + // This produces buckets quantized at boundaries of powers of 2 + // Syncs: a b c d e + // Bucket 0 |a| |b| |c| |d| |e| (size 1) + // Bucket 1 | a| | b| |c | |d | e| (size 2) + // Bucket 2 | a | | b| | c | | d | | (size 4) + // Bucket 3 | a | | b | | c | d | (size 8) + // Bucket 4 | a | b | c | (size 16) + // Bucket 5 | a | c | (size 32) + // Bucket 6 | a | (size 64) + // ... a ... + // Bucket63 a (size 2^63) + // State after a: [a,,,,,,, ... ,a] (all buckets written) + // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written) + // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b expired completely) + // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written) + // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written) Review Comment: I've been trying to read this part for the last 20 minutes and still can't really understand the concept it's trying to relay. Could it help to refer back to the invariants mentioned in the Javadoc for this class and how the logic here implements those correctly? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -178,13 +182,44 @@ private List<SourceRecord> sourceRecordsForGroup(String group) throws Interrupte } } - private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException { - return listConsumerGroupOffsets(group).entrySet().stream() + // for testing + protected Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets, String group) throws ExecutionException, InterruptedException { Review Comment: Should be package-private, not protected ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -57,52 +57,96 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); - assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); - assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); - assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); + assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); } } + + @Test + public void testPastOffsetTranslation() { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + long maxOffsetLag = 10; + int offset = 0; + for (; offset <= 1000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + } + store.start(); + + // After starting but before seeing new offsets, only the latest startup offset can be translated + assertSparseSync(store, 1000, -1); + + for (; offset <= 2000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + } + + // After seeing new offsets, we still cannot translate earlier than the latest startup offset + assertSparseSync(store, 1000, -1); + + // We can translate offsets between the latest startup offset and the latest offset with variable precision + // Older offsets are less precise and translation ends up farther apart + assertSparseSync(store, 1030, 1000); + assertSparseSync(store, 1540, 1030); + assertSparseSync(store, 1800, 1540); + assertSparseSync(store, 1920, 1800); + assertSparseSync(store, 1990, 1920); + assertSparseSync(store, 2000, 1990); Review Comment: Where do the argument values for these calls come from? Naively, it looks like this test might have been written by observing the behavior of the `OffsetSyncStore` class (however arbitrary or even incorrect it may be) and then adding assertions that match the observed behavior, rather than identifying what correct values would look like and then ensuring that the `OffsetSyncStore` class's behavior gives those values. Separately but possibly related, would it help to have a method that ensures that the invariants for the syncs stored by the `OffsetSyncStore` hold? I drafted something like this and it seems to work nicely: In `OffsetSyncStoreTest`: ```java private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) { for (int j = 0; j < Long.SIZE; j++) { for (int i = 0; i <= j; i++) { long jUpstream = store.syncFor(topicPartition, j).upstreamOffset(); long iUpstream = store.syncFor(topicPartition, i).upstreamOffset(); assertTrue( jUpstream <= iUpstream, "Upstream offset " + iUpstream + " at position " + i + " should be at least as recent as offset " + jUpstream + " at position " + j ); long iUpstreamBound = jUpstream + (1L << j); if (iUpstreamBound < 0) continue; assertTrue( iUpstream < iUpstreamBound, "Upstream offset " + iUpstream + " at position " + i + " should be no greater than " + iUpstreamBound + " (" + jUpstream + " + 2^" + j + ")" ); } } } ``` (this method can be invoked after ever call to `OffsetSyncStore::sync`) In `OffsetSyncStore`: ```java // For testing OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) { OffsetSync[] syncs = offsetSyncs.get(topicPartition); if (syncs == null) throw new IllegalArgumentException("No syncs present for " + topicPartition); if (syncIdx >= syncs.length) throw new IllegalArgumentException( "Requested sync " + (syncIdx + 1) + " for " + topicPartition + " but there are only " + syncs.length + " syncs available for that topic partition" ); return syncs[syncIdx]; } ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, Review Comment: I think this might confuse some users, since you'd basically have to look at this exact line in the source code to know what a series of consecutive commas represents. IMO we should either skip identical consecutive elements and the commas before them, or include both them and their commas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org