C0urante commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1039745954


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTombstoneOffset() throws Exception {
+        expectConfigure();
+        expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                new RecordHeaders(), Optional.empty())));
+
+        Capture<org.apache.kafka.clients.producer.Callback> producerCallback = 
EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback));
+        PowerMock.expectLastCall();
+
+        final Capture<Callback<Void>> readToEndCallback = 
EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(readToEndCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null,
+                            new RecordHeaders(), Optional.empty()));
+            readToEndCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        expectStop();
+        expectClusterId();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        store.start();
+
+        // Write tombstone offset
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, null);
+
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, (error, result) -> 
invoked.set(true));
+        assertFalse(setFuture.isDone());
+        producerCallback.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        Map<ByteBuffer, ByteBuffer> offsets = 
store.get(Collections.singletonList(TP0_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertNull(offsets.get(TP0_KEY));
+
+        // Just verifying that KafkaOffsetBackingStore::get returns null isn't 
enough, we also need to verify that the mapping for the source partition key is 
removed.
+        // This is because KafkaOffsetBackingStore::get returns null if either 
there is no existing offset for the source partition or if there is an offset 
with null value.
+        // We need to make sure that tombstoned offsets are removed completely 
(i.e. that the mapping for the corresponding source partition is removed).
+        HashMap<ByteBuffer, ByteBuffer> data = 
Whitebox.getInternalState(store, "data");
+        assertFalse(data.containsKey(TP0_KEY));

Review Comment:
   Sorry, what exactly is clearer about writing a new test case instead of 
modifying the existing one?
   
   The logic we're testing here is directly related to handling `null` values 
and the cognitive overhead of another test case. Plus, we're going to have to 
add logic for writing a non-null value with a given key no matter what, but 
with a new test case, we also have to duplicate the existing logic for writing 
a null value for that same key. Finally, considering how much overhead is 
involved in the setup of every test case in this class, we should be trying to 
minimize the number of test cases where possible.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -325,11 +325,12 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> 
values, final Callback
         return producerCallback;
     }
 
-    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], 
byte[]> record) {
-            ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= (error, record) -> {
+        ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) 
: null;

Review Comment:
   Yes, I noticed that too; still, if the API we're using (even if it's 
internal-only) has a signature that can accept an error, we should handle those 
errors. An alternative is to refactor things to use a 
`java.util.function.Consumer` instead of a `Callback`, but it's not worth the 
impact that that would have on the `KafkaBasedLog` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to