gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted to open a ticket. Additionally, it increases 
the difference in semantics between using the raw checkpoints topic and synced 
consumer offsets, which I think is more likely to lead to confusion.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the solution in this PR is just part 
of that overall solution. The limitation you pointed out is a very important 
one, as it's the core limitation that this PR is trying to solve. Before this 
PR, consumers lagging behind the latest offset sync couldn't be translated. 
After this PR, consumers lagging behind the latest checkpoint task restart 
can't be translated. I think that is a strict improvement, and even if this PR 
doesn't address the issue completely, it is worth merging to get a release 
out-the-door.
   
   > Based on our discussion so far, it seems that monotonicity will not allow 
us to implement "old offset checkpointing after restart", which is not ideal. 
Worst case of breaking monotonicity is sub-optimal failover and extra 
re-processing. Worst case for not translating old offsets after restart is 
never checkpointing a group - then, with the default consumer config 
(auto.offset.reset=latest), failover results in data loss.
   
   I don't think the auto.offset.reset=latest situation is a fair criticism of 
this PR, as that can happen with or without this change. And actually, it's 
more likely to happen without this change, as the translation window is smaller.
   
   You are welcome to help in designing and implementing the full 
back-translation algorithm and help get it landed in 3.5.1/3.6. This PR is a 
tactical change to get us over the line to 3.5.0, and is necessarily going to 
leave some things on the table. I'm glad that you are interested in MM2 offset 
translation, as I think full back-translation may be more than I can take on 
alone. Are you interested in exploring these ideas yourself and opening a 
follow-up PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to