C0urante commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1025529528


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -325,11 +325,12 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> 
values, final Callback
         return producerCallback;
     }
 
-    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], 
byte[]> record) {
-            ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= (error, record) -> {
+        ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) 
: null;
+        ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+        if (value == null) {
+            data.remove(key);

Review Comment:
   It's a little strange to keep the `value` initializer the way it was before. 
IMO this would be more readable:
   ```java
           if (record.value() == null)
               data.remove(key);
           else
               data.put(key, ByteBuffer.wrap(record.value()));
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to