vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1216312002


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,30 +280,47 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
+        boolean containsTombstones = values.entrySet()
+                .stream()
+                .anyMatch(offset -> offset.getValue() == null);
+
         return primaryStore.set(values, (primaryWriteError, ignored) -> {
             if (secondaryStore != null) {
                 if (primaryWriteError != null) {
                     log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
+                    try (LoggingContext context = loggingContext()) {
+                        callback.onCompletion(primaryWriteError, ignored);
+                    }
                 } else {
                     try {
                         // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
-                        // backing store.
+                        // backing store. The only exception to this is when a 
batch consisting of tombstone records fails
+                        // to be written to secondary store and has been 
successfully written to the primary store. In this case
+                        // an error would be propagated back as in such cases, 
a deleted source partition
+                        // would be reported as present because the 2 stores 
are not in sync.
                         secondaryStore.set(values, (secondaryWriteError, 
ignored2) -> {
                             try (LoggingContext context = loggingContext()) {
-                                if (secondaryWriteError != null) {
+                                if (secondaryWriteError != null && 
containsTombstones) {
+                                    log.warn("Failed to write offsets with 
tombstone records to secondary backing store", secondaryWriteError);
+                                    callback.onCompletion(secondaryWriteError, 
ignored);
+                                    return;
+                                } else if (secondaryWriteError != null) {
                                     log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
                                 } else {
                                     log.debug("Successfully flushed offsets to 
secondary backing store");
                                 }
+                                //primaryWriteError is null at this point, and 
we don't care about secondaryWriteError
+                                callback.onCompletion(null, ignored);

Review Comment:
   I have removed this. I needed to tweak the callback logic a little but 
because eventually it needs to be invoked only once so that 
`OffsetStorageWriter#handleFinishWrite` doesn't ignore the flush responses. Let 
me know how the changes are looking now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to