This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new 721a917b444 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) 721a917b444 is described below commit 721a917b444fb7d06790fe40c4a21c70d7790c97 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Tue May 9 17:58:45 2023 +0530 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) `KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply b [...] This refactoring changes no functionality other than restoring the older methods. Reviewers: Randall Hauch <rha...@gmail.com> --- .../connect/storage/KafkaConfigBackingStore.java | 4 +-- .../apache/kafka/connect/util/KafkaBasedLog.java | 37 +++++++++++++++++++--- .../storage/KafkaConfigBackingStoreTest.java | 10 +++--- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e9f87697308..b5804697600 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -637,7 +637,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState); log.debug("Writing target state {} for connector {}", state, connector); try { - configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Failed to write target state to Kafka", e); throw new ConnectException("Error writing target state to Kafka", e); @@ -789,7 +789,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { if (!usesFencableWriter) { List<Future<RecordMetadata>> producerFutures = new ArrayList<>(); keyValues.forEach( - keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) + keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value)) ); timer.update(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 431ae871ce9..a05a05dffe7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -77,6 +77,10 @@ import java.util.function.Supplier; * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required. * </p> + * <p> + * This is a useful utility that has been used outside of Connect. This isn't in Connect's public API, + * but we've tried to maintain the method signatures and backward compatibility since early Kafka versions. + * </p> */ public class KafkaBasedLog<K, V> { private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); @@ -351,6 +355,31 @@ public class KafkaBasedLog<K, V> { return future; } + /** + * Send a record asynchronously to the configured {@link #topic} without using a producer callback. + * <p> + * This method exists for backward compatibility reasons and delegates to the newer + * {@link #sendWithReceipt(Object, Object)} method that returns a future. + * @param key the key for the {@link ProducerRecord} + * @param value the value for the {@link ProducerRecord} + */ + public void send(K key, V value) { + sendWithReceipt(key, value); + } + + /** + * Send a record asynchronously to the configured {@link #topic}. + * <p> + * This method exists for backward compatibility reasons and delegates to the newer + * {@link #sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback)} method that returns a future. + * @param key the key for the {@link ProducerRecord} + * @param value the value for the {@link ProducerRecord} + * @param callback the callback to invoke after completion; can be null if no callback is desired + */ + public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) { + sendWithReceipt(key, value, callback); + } + /** * Send a record asynchronously to the configured {@link #topic} without using a producer callback. * @param key the key for the {@link ProducerRecord} @@ -359,12 +388,12 @@ public class KafkaBasedLog<K, V> { * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned * future if synchronous behavior is desired. */ - public Future<RecordMetadata> send(K key, V value) { - return send(key, value, null); + public Future<RecordMetadata> sendWithReceipt(K key, V value) { + return sendWithReceipt(key, value, null); } /** - * Send a record asynchronously to the configured {@link #topic} + * Send a record asynchronously to the configured {@link #topic}. * @param key the key for the {@link ProducerRecord} * @param value the value for the {@link ProducerRecord} * @param callback the callback to invoke after completion; can be null if no callback is desired @@ -372,7 +401,7 @@ public class KafkaBasedLog<K, V> { * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned * future if synchronous behavior is desired. */ - public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) { + public Future<RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) { return producer.orElseThrow(() -> new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations") ).send(new ProducerRecord<>(topic, key, value), callback); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 8fb3bf30165..20e16c2ed75 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -344,7 +344,7 @@ public class KafkaConfigBackingStoreTest { expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0)); - storeLog.send(EasyMock.anyObject(), EasyMock.anyObject()); + storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().andReturn(producerFuture); producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject()); @@ -379,13 +379,13 @@ public class KafkaConfigBackingStoreTest { @SuppressWarnings("unchecked") Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class); // tombstone for the connector config - storeLog.send(EasyMock.anyObject(), EasyMock.isNull()); + storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull()); EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture); @SuppressWarnings("unchecked") Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class); // tombstone for the connector target state - storeLog.send(EasyMock.anyObject(), EasyMock.isNull()); + storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull()); EasyMock.expectLastCall().andReturn(targetStateProducerFuture); connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject()); @@ -460,7 +460,7 @@ public class KafkaConfigBackingStoreTest { // In the meantime, write a target state (which doesn't require write privileges) expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1)); - storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1)); + storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1)); EasyMock.expectLastCall().andReturn(producerFuture); producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject()); EasyMock.expectLastCall().andReturn(null); @@ -1601,7 +1601,7 @@ public class KafkaConfigBackingStoreTest { EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord))) .andReturn(serialized); - storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized)); + storeLog.sendWithReceipt(EasyMock.eq(configKey), EasyMock.aryEq(serialized)); EasyMock.expectLastCall().andReturn(producerFuture); producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());