Ruiqi Dong created KAFKA-20666:
----------------------------------
Summary: Connect offset backing stores serialize
ByteBuffer.array() instead of the buffer's remaining bytes
Key: KAFKA-20666
URL: https://issues.apache.org/jira/browse/KAFKA-20666
Project: Kafka
Issue Type: Bug
Components: connect
Reporter: Ruiqi Dong
*Summary*
Kafka Connect offset storage APIs use `ByteBuffer` keys and values. A
`ByteBuffer`'s logical contents are its remaining bytes, not necessarily the
entire backing array. `KafkaOffsetBackingStore` and `FileOffsetBackingStore`
serialize offsets with `buffer.array()`. This ignores `position()`, `limit()`,
and `arrayOffset()`, and also throws for direct buffers. As a result, sliced
buffers can write different offset keys/values than the caller supplied.
*Affected code*
File:
`connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java`
{code:java}
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
ByteBuffer key = entry.getKey();
ByteBuffer value = entry.getValue();
offsetLog.send(key == null ? null : key.array(), value == null ? null :
value.array(), producerCallback);
} {code}
File:
`connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java`
{code:java}
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array()
: null;
raw.put(key, value);
OffsetUtils.processPartitionKey(key, value, keyConverter,
connectorPartitions);
} {code}
`OffsetStorageReaderImpl` has the same pattern when deserializing returned
values:
{code:java}
valueConverter.toConnectData(namespace, rawEntry.getValue() != null ?
rawEntry.getValue().array() : null); {code}
{*}Reproducer 1{*}: distributed offset store sends the wrong bytes
Add this test to
`connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java`:
{code:java}
@Test
public void testSetUsesByteBufferRemainingBytes() {
setup(false);
store.configure(mockConfig(props));
store.start();
verify(storeLog).start();
Map<ByteBuffer, ByteBuffer> offsets = Map.of(
ByteBuffer.wrap("xkeyx".getBytes(), 1, 3),
ByteBuffer.wrap("xvaluex".getBytes(), 1, 5)
);
Future<Void> setFuture = store.set(offsets, (error, result) -> { });
assertFalse(setFuture.isDone());
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback =
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(buffer("key").array()),
aryEq(buffer("value").array()), callback.capture());
store.stop();
verify(storeLog).stop();
} {code}
Run:
{code:java}
./gradlew -q :connect:runtime:test \
--tests KafkaOffsetBackingStoreTest.testSetUsesByteBufferRemainingBytes {code}
Observed behavior:
The test fails because `KafkaOffsetBackingStore` sends the whole backing arrays
{code:java}
Wanted: key=[0x6B, 0x65, 0x79], value=[0x76, 0x61, 0x6C, 0x75, 0x65]
Actual: key=[0x78, 0x6B, 0x65, 0x79, 0x78], value=[0x78, 0x76, 0x61, 0x6C,
0x75, 0x65, 0x78] {code}
{*}Reproducer 2{*}: file offset store cannot restore a sliced-buffer offset by
its logical key
Add this test to
`connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java`
{code:java}
@Test
public void testSaveRestoreUsesByteBufferRemainingBytes() throws Exception {
@SuppressWarnings("unchecked")
Callback<Void> setCallback = mock(Callback.class);
Map<ByteBuffer, ByteBuffer> offsets = Map.of(
ByteBuffer.wrap("xkeyx".getBytes(), 1, 3),
ByteBuffer.wrap("xvaluex".getBytes(), 1, 5)
);
store.set(offsets, setCallback).get();
store.stop();
FileOffsetBackingStore restore = new FileOffsetBackingStore(converter);
restore.configure(config);
restore.start();
Map<ByteBuffer, ByteBuffer> values =
restore.get(List.of(buffer("key"))).get();
assertEquals(buffer("value"), values.get(buffer("key")));
verify(setCallback).onCompletion(isNull(), isNull());
} {code}
Run:
{code:java}
./gradlew -q :connect:runtime:test \
--tests
FileOffsetBackingStoreTest.testSaveRestoreUsesByteBufferRemainingBytes {code}
Observed behavior:
The test fails. The file-backed store persists the physical arrays `xkeyx` and
`xvaluex`, so the restored store has no offset under the logical key `key`.
*Expected behavior*
Offset backing stores should serialize exactly the remaining bytes of each
`ByteBuffer`, respecting `position()`, `limit()`, and `arrayOffset()`. They
should also work for direct buffers.
Kafka Connect offset keys determine source partition identity and offset
recovery. Persisting a different byte sequence from the logical `ByteBuffer`
contents can make a task fail to find its previous offsets after restart,
duplicate work, skip records, or write tombstones under the wrong key. The fix
direction is to duplicate the buffer and copy `remaining()` bytes into a new
array instead of calling `array()` directly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)