C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1149500829
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,37 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset
translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the
in-memory sync storage:
+ * <ul>
+ * <li>syncs[0] is the latest offset sync from the syncs topic</li>
+ * <li>For each i,j, i <= j: syncs[j].upstream <= syncs[i].upstream <
syncs[j].upstream + 2^j</li>
+ * </ul>
+ * <p>Offset translation uses the syncs[i] which most closely precedes the
upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will
be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will
not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an
initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are
present for a topic-partition, or if relevant
+ * offset syncs for the topic were eligible for compaction at the time of the
initial read-to-end.
Review Comment:
I'm unclear on what exactly this part refers to--does it cover the case
where syncs for earlier offsets have been wiped due to compaction? If so, we
may want to clarify that offset syncs were "lost due to compaction" instead of
"eligible for compaction". And it's also possible that replication began from a
later point than the consumer group is reading from, which is also a possible
reason that translation may be unavailable. Oh, and I guess there's the case
where replication began, but was unable to produce offset syncs for whatever
reason.
If I'm on the right track here, maybe it'd be simplest to just say "or if no
relevant offset syncs for the topic were available at the time of the initial
read-to-end"?
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < Long.SIZE; i++) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] !=
mutableSyncs[i - 1]) {
+ // Print only if the sync is interesting, a series of
repeated syncs will appear as ,,,,,
+ stateString.append(mutableSyncs[i].upstreamOffset());
+ stateString.append(":");
+ stateString.append(mutableSyncs[i].downstreamOffset());
+ }
+ }
+ stateString.append("]");
+ log.trace("New sync {} applied, new state is {}", offsetSync,
stateString);
+ }
+ return mutableSyncs;
+ }
+
+ private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+ OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+ clearSyncArray(syncs, firstSync);
+ return syncs;
+ }
+
+ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ for (int i = 0; i < Long.SIZE; i++) {
+ syncs[i] = offsetSync;
+ }
+ }
+
+ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ long upstreamOffset = offsetSync.upstreamOffset();
+ // Old offsets are invalid, so overwrite them all.
+ if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+ clearSyncArray(syncs, offsetSync);
+ return;
+ }
+ syncs[0] = offsetSync;
+ for (int i = 1; i < Long.SIZE; i++) {
+ OffsetSync oldValue = syncs[i];
+ long mask = Long.MAX_VALUE << i;
+ // If the old value is too stale: at least one of the 64-i high
bits of the offset value have changed
+ // This produces buckets quantized at boundaries of powers of 2
+ // Syncs: a b c d e
+ // Bucket 0 |a| |b| |c| |d| |e|
(size 1)
+ // Bucket 1 | a| | b| |c | |d | e|
(size 2)
+ // Bucket 2 | a | | b| | c | | d | |
(size 4)
+ // Bucket 3 | a | | b | | c | d |
(size 8)
+ // Bucket 4 | a | b | c |
(size 16)
+ // Bucket 5 | a | c
| (size 32)
+ // Bucket 6 | a
| (size 64)
+ // ... a
...
+ // Bucket63 a
(size 2^63)
+ // State after a: [a,,,,,,, ... ,a] (all buckets written)
+ // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+ // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b
expired completely)
+ // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+ // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)
+ if (((oldValue.upstreamOffset() ^ upstreamOffset) & mask) != 0) {
+ syncs[i] = offsetSync;
+ } else {
+ break;
+ }
+ }
}
- private Optional<OffsetSync> latestOffsetSync(TopicPartition
topicPartition) {
- return Optional.ofNullable(offsetSyncs.get(topicPartition));
+ private Optional<OffsetSync> latestOffsetSync(TopicPartition
topicPartition, long offset) {
+ return Optional.ofNullable(offsetSyncs.get(topicPartition))
+ .map(syncs -> lookupLatestSync(syncs, offset));
+ }
+
+
+ private OffsetSync lookupLatestSync(OffsetSync[] syncs, long offset) {
+ // optimization: skip loop if every sync in the store is ahead of the
requested offset
Review Comment:
Do we think this will be necessary? IIUC this logic isn't really on a hot
path performance-wise.
We could also do a binary search across the elements of the `syncs` array
instead of a linear search if we really want to get fancy, but I'm inclined to
try to keep things simpler for now if we can.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < Long.SIZE; i++) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] !=
mutableSyncs[i - 1]) {
+ // Print only if the sync is interesting, a series of
repeated syncs will appear as ,,,,,
+ stateString.append(mutableSyncs[i].upstreamOffset());
+ stateString.append(":");
+ stateString.append(mutableSyncs[i].downstreamOffset());
+ }
+ }
+ stateString.append("]");
+ log.trace("New sync {} applied, new state is {}", offsetSync,
stateString);
+ }
+ return mutableSyncs;
+ }
+
+ private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+ OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+ clearSyncArray(syncs, firstSync);
+ return syncs;
+ }
+
+ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ for (int i = 0; i < Long.SIZE; i++) {
+ syncs[i] = offsetSync;
+ }
+ }
+
+ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ long upstreamOffset = offsetSync.upstreamOffset();
+ // Old offsets are invalid, so overwrite them all.
+ if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+ clearSyncArray(syncs, offsetSync);
+ return;
+ }
+ syncs[0] = offsetSync;
+ for (int i = 1; i < Long.SIZE; i++) {
+ OffsetSync oldValue = syncs[i];
+ long mask = Long.MAX_VALUE << i;
+ // If the old value is too stale: at least one of the 64-i high
bits of the offset value have changed
+ // This produces buckets quantized at boundaries of powers of 2
+ // Syncs: a b c d e
+ // Bucket 0 |a| |b| |c| |d| |e|
(size 1)
+ // Bucket 1 | a| | b| |c | |d | e|
(size 2)
+ // Bucket 2 | a | | b| | c | | d | |
(size 4)
+ // Bucket 3 | a | | b | | c | d |
(size 8)
+ // Bucket 4 | a | b | c |
(size 16)
+ // Bucket 5 | a | c
| (size 32)
+ // Bucket 6 | a
| (size 64)
+ // ... a
...
+ // Bucket63 a
(size 2^63)
+ // State after a: [a,,,,,,, ... ,a] (all buckets written)
+ // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+ // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b
expired completely)
+ // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+ // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)
+ if (((oldValue.upstreamOffset() ^ upstreamOffset) & mask) != 0) {
+ syncs[i] = offsetSync;
+ } else {
+ break;
+ }
+ }
}
- private Optional<OffsetSync> latestOffsetSync(TopicPartition
topicPartition) {
- return Optional.ofNullable(offsetSyncs.get(topicPartition));
+ private Optional<OffsetSync> latestOffsetSync(TopicPartition
topicPartition, long offset) {
+ return Optional.ofNullable(offsetSyncs.get(topicPartition))
+ .map(syncs -> lookupLatestSync(syncs, offset));
+ }
+
+
+ private OffsetSync lookupLatestSync(OffsetSync[] syncs, long offset) {
Review Comment:
Nit: it could be a little clearer to rename the `offset` parameter to
`upstreamOffset`
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
Review Comment:
`Long.SIZE` is used in several places in this change. Could we pull it out
into a constant like `SYNCS_PER_PARTITION`?
If there's a special reason we're using `Long.SIZE` for that value, we
should probably also add a comment on why we've chosen it.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
Review Comment:
This can be a single statement:
```suggestion
offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
syncs == null
? createInitialSyncs(offsetSync)
: updateExistingSyncs(syncs, offsetSync)
);
```
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < Long.SIZE; i++) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] !=
mutableSyncs[i - 1]) {
+ // Print only if the sync is interesting, a series of
repeated syncs will appear as ,,,,,
+ stateString.append(mutableSyncs[i].upstreamOffset());
+ stateString.append(":");
+ stateString.append(mutableSyncs[i].downstreamOffset());
+ }
+ }
+ stateString.append("]");
+ log.trace("New sync {} applied, new state is {}", offsetSync,
stateString);
Review Comment:
Could we pull this out into a separate method? Maybe something like
`logUpdatedSyncs`.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < Long.SIZE; i++) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] !=
mutableSyncs[i - 1]) {
+ // Print only if the sync is interesting, a series of
repeated syncs will appear as ,,,,,
+ stateString.append(mutableSyncs[i].upstreamOffset());
+ stateString.append(":");
+ stateString.append(mutableSyncs[i].downstreamOffset());
+ }
+ }
+ stateString.append("]");
+ log.trace("New sync {} applied, new state is {}", offsetSync,
stateString);
+ }
+ return mutableSyncs;
+ }
+
+ private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+ OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+ clearSyncArray(syncs, firstSync);
+ return syncs;
+ }
+
+ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ for (int i = 0; i < Long.SIZE; i++) {
+ syncs[i] = offsetSync;
+ }
+ }
+
+ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ long upstreamOffset = offsetSync.upstreamOffset();
+ // Old offsets are invalid, so overwrite them all.
+ if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
Review Comment:
Doesn't the `!readToEnd` part of this condition cause us to drop older
offset syncs? Don't we want to hold onto those in order to be able to translate
older upstream offsets on a best-effort basis (as long as there are relevant
syncs available that haven't been compacted yet)?
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
Review Comment:
If we know with certainty that this change will yield significant
improvement, we should file a follow-up ticket for it.
On the other hand, if this is more of a "we may want to look into it" sort
of TODO, then we should tweak the language to something like "consider batching
updates" to make it clear that this change might be useful, but we don't know
for sure without more information.
(I'm reluctant to file follow-up tickets for things that may or may not be
necessary since people may self-assign those tickets without understanding what
information we need before deciding whether to implement the change or not.)
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < Long.SIZE; i++) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] !=
mutableSyncs[i - 1]) {
+ // Print only if the sync is interesting, a series of
repeated syncs will appear as ,,,,,
+ stateString.append(mutableSyncs[i].upstreamOffset());
+ stateString.append(":");
+ stateString.append(mutableSyncs[i].downstreamOffset());
+ }
+ }
+ stateString.append("]");
+ log.trace("New sync {} applied, new state is {}", offsetSync,
stateString);
+ }
+ return mutableSyncs;
+ }
+
+ private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+ OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+ clearSyncArray(syncs, firstSync);
+ return syncs;
+ }
+
+ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ for (int i = 0; i < Long.SIZE; i++) {
+ syncs[i] = offsetSync;
+ }
+ }
+
+ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ long upstreamOffset = offsetSync.upstreamOffset();
+ // Old offsets are invalid, so overwrite them all.
+ if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+ clearSyncArray(syncs, offsetSync);
+ return;
+ }
+ syncs[0] = offsetSync;
+ for (int i = 1; i < Long.SIZE; i++) {
+ OffsetSync oldValue = syncs[i];
+ long mask = Long.MAX_VALUE << i;
+ // If the old value is too stale: at least one of the 64-i high
bits of the offset value have changed
+ // This produces buckets quantized at boundaries of powers of 2
+ // Syncs: a b c d e
+ // Bucket 0 |a| |b| |c| |d| |e|
(size 1)
+ // Bucket 1 | a| | b| |c | |d | e|
(size 2)
+ // Bucket 2 | a | | b| | c | | d | |
(size 4)
+ // Bucket 3 | a | | b | | c | d |
(size 8)
+ // Bucket 4 | a | b | c |
(size 16)
+ // Bucket 5 | a | c
| (size 32)
+ // Bucket 6 | a
| (size 64)
+ // ... a
...
+ // Bucket63 a
(size 2^63)
+ // State after a: [a,,,,,,, ... ,a] (all buckets written)
+ // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+ // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b
expired completely)
+ // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+ // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)
Review Comment:
I've been trying to read this part for the last 20 minutes and still can't
really understand the concept it's trying to relay.
Could it help to refer back to the invariants mentioned in the Javadoc for
this class and how the logic here implements those correctly?
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -178,13 +182,44 @@ private List<SourceRecord> sourceRecordsForGroup(String
group) throws Interrupte
}
}
- private List<Checkpoint> checkpointsForGroup(String group) throws
ExecutionException, InterruptedException {
- return listConsumerGroupOffsets(group).entrySet().stream()
+ // for testing
+ protected Map<TopicPartition, Checkpoint>
checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata>
upstreamGroupOffsets, String group) throws ExecutionException,
InterruptedException {
Review Comment:
Should be package-private, not protected
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +57,96 @@ public void testOffsetTranslation() {
// Emit synced downstream offset without dead-reckoning
store.sync(tp, 100, 200);
- assertEquals(OptionalLong.of(201), store.translateDownstream(tp,
150));
+ assertEquals(OptionalLong.of(201), store.translateDownstream(null,
tp, 150));
// Translate exact offsets
store.sync(tp, 150, 251);
- assertEquals(OptionalLong.of(251), store.translateDownstream(tp,
150));
+ assertEquals(OptionalLong.of(251), store.translateDownstream(null,
tp, 150));
// Use old offset (5) prior to any sync -> can't translate
- assertEquals(OptionalLong.of(-1), store.translateDownstream(tp,
5));
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(null,
tp, 5));
// Downstream offsets reset
store.sync(tp, 200, 10);
- assertEquals(OptionalLong.of(10), store.translateDownstream(tp,
200));
+ assertEquals(OptionalLong.of(10), store.translateDownstream(null,
tp, 200));
// Upstream offsets reset
store.sync(tp, 20, 20);
- assertEquals(OptionalLong.of(20), store.translateDownstream(tp,
20));
+ assertEquals(OptionalLong.of(20), store.translateDownstream(null,
tp, 20));
}
}
@Test
public void testNoTranslationIfStoreNotStarted() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
// no offsets exist and store is not started
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
0));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
100));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
200));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 200));
// read a sync during startup
store.sync(tp, 100, 200);
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
0));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
100));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
200));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 200));
// After the store is started all offsets are visible
store.start();
- assertEquals(OptionalLong.of(-1), store.translateDownstream(tp,
0));
- assertEquals(OptionalLong.of(200), store.translateDownstream(tp,
100));
- assertEquals(OptionalLong.of(201), store.translateDownstream(tp,
200));
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(null,
tp, 0));
+ assertEquals(OptionalLong.of(200), store.translateDownstream(null,
tp, 100));
+ assertEquals(OptionalLong.of(201), store.translateDownstream(null,
tp, 200));
}
}
@Test
public void testNoTranslationIfNoOffsetSync() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null,
tp, 0));
}
}
+
+ @Test
+ public void testPastOffsetTranslation() {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ long maxOffsetLag = 10;
+ int offset = 0;
+ for (; offset <= 1000; offset += maxOffsetLag) {
+ store.sync(tp, offset, offset);
+ }
+ store.start();
+
+ // After starting but before seeing new offsets, only the latest
startup offset can be translated
+ assertSparseSync(store, 1000, -1);
+
+ for (; offset <= 2000; offset += maxOffsetLag) {
+ store.sync(tp, offset, offset);
+ }
+
+ // After seeing new offsets, we still cannot translate earlier
than the latest startup offset
+ assertSparseSync(store, 1000, -1);
+
+ // We can translate offsets between the latest startup offset and
the latest offset with variable precision
+ // Older offsets are less precise and translation ends up farther
apart
+ assertSparseSync(store, 1030, 1000);
+ assertSparseSync(store, 1540, 1030);
+ assertSparseSync(store, 1800, 1540);
+ assertSparseSync(store, 1920, 1800);
+ assertSparseSync(store, 1990, 1920);
+ assertSparseSync(store, 2000, 1990);
Review Comment:
Where do the argument values for these calls come from?
Naively, it looks like this test might have been written by observing the
behavior of the `OffsetSyncStore` class (however arbitrary or even incorrect it
may be) and then adding assertions that match the observed behavior, rather
than identifying what correct values would look like and then ensuring that the
`OffsetSyncStore` class's behavior gives those values.
Separately but possibly related, would it help to have a method that ensures
that the invariants for the syncs stored by the `OffsetSyncStore` hold? I
drafted something like this and it seems to work nicely:
In `OffsetSyncStoreTest`:
```java
private void assertSparseSyncInvariant(FakeOffsetSyncStore store,
TopicPartition topicPartition) {
for (int j = 0; j < Long.SIZE; j++) {
for (int i = 0; i <= j; i++) {
long jUpstream = store.syncFor(topicPartition,
j).upstreamOffset();
long iUpstream = store.syncFor(topicPartition,
i).upstreamOffset();
assertTrue(
jUpstream <= iUpstream,
"Upstream offset " + iUpstream + " at position " + i
+ " should be at least as recent as offset " +
jUpstream + " at position " + j
);
long iUpstreamBound = jUpstream + (1L << j);
if (iUpstreamBound < 0)
continue;
assertTrue(
iUpstream < iUpstreamBound,
"Upstream offset " + iUpstream + " at position " + i
+ " should be no greater than " + iUpstreamBound + " ("
+ jUpstream + " + 2^" + j + ")"
);
}
}
}
```
(this method can be invoked after ever call to `OffsetSyncStore::sync`)
In `OffsetSyncStore`:
```java
// For testing
OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
OffsetSync[] syncs = offsetSyncs.get(topicPartition);
if (syncs == null)
throw new IllegalArgumentException("No syncs present for " +
topicPartition);
if (syncIdx >= syncs.length)
throw new IllegalArgumentException(
"Requested sync " + (syncIdx + 1) + " for " + topicPartition
+ " but there are only " + syncs.length + " syncs available
for that topic partition"
);
return syncs[syncIdx];
}
```
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored ->
createInitialSyncs(offsetSync));
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
updateExistingSyncs(syncs, offsetSync));
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync
offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not
see inconsistent data
+ // TODO: batch updates so that this copy can be performed less often
for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < Long.SIZE; i++) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] !=
mutableSyncs[i - 1]) {
+ // Print only if the sync is interesting, a series of
repeated syncs will appear as ,,,,,
Review Comment:
I think this might confuse some users, since you'd basically have to look at
this exact line in the source code to know what a series of consecutive commas
represents.
IMO we should either skip identical consecutive elements and the commas
before them, or include both them and their commas.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]