showuon commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r896805607
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -63,8 +67,62 @@
public class KafkaOffsetBackingStore implements OffsetBackingStore {
private static final Logger log =
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
- private KafkaBasedLog<byte[], byte[]> offsetLog;
- private HashMap<ByteBuffer, ByteBuffer> data;
+ /**
+ * Build a connector-specific offset store with read and write support.
+ * @param topic the name of the offsets topic to use
+ * @param producer the producer to use for writing to the offsets topic
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying
metadata for the offsets topic
+ * @return an offset store backed by the given topic and Kafka clients
+ */
+ public static KafkaOffsetBackingStore forTask(String topic,
+ Producer<byte[],
byte[]> producer,
+ Consumer<byte[],
byte[]> consumer,
+ TopicAdmin topicAdmin) {
+ return new KafkaOffsetBackingStore(() -> topicAdmin) {
+ @Override
+ public void configure(final WorkerConfig config) {
+ offsetLog = KafkaBasedLog.withExistingClients(
+ topic,
+ consumer,
+ producer,
+ topicAdmin,
+ consumedCallback,
+ Time.SYSTEM,
+ initialize(topic, topicDescription(topic, config))
+ );
+ }
+ };
+ }
+
+ /**
+ * Build a connector-specific offset store with read-only support.
+ * @param topic the name of the offsets topic to use
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying
metadata for the offsets topic
+ * @return a read-only offset store backed by the given topic and Kafka
clients
+ */
+ public static KafkaOffsetBackingStore forConnector(String topic,
+ Consumer<byte[], byte[]>
consumer,
+ TopicAdmin topicAdmin) {
Review Comment:
nit: indent
##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -381,6 +435,10 @@ Map<TopicPartition, Long>
readEndOffsets(Set<TopicPartition> assignment, boolean
} catch (UnsupportedVersionException e) {
// This may happen with really old brokers that don't support
the auto topic creation
// field in metadata requests
+ if (requireAdminForOffsets) {
+ // Should be handled by the caller during log startup
+ throw e;
Review Comment:
Should we set `admin = null` in this case, like the comment said:
`// Forget the reference to the admin so that we won't even try to use the
admin the next time this method is called` ?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -63,8 +67,62 @@
public class KafkaOffsetBackingStore implements OffsetBackingStore {
private static final Logger log =
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
- private KafkaBasedLog<byte[], byte[]> offsetLog;
- private HashMap<ByteBuffer, ByteBuffer> data;
+ /**
+ * Build a connector-specific offset store with read and write support.
+ * @param topic the name of the offsets topic to use
+ * @param producer the producer to use for writing to the offsets topic
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying
metadata for the offsets topic
+ * @return an offset store backed by the given topic and Kafka clients
+ */
+ public static KafkaOffsetBackingStore forTask(String topic,
+ Producer<byte[],
byte[]> producer,
+ Consumer<byte[],
byte[]> consumer,
+ TopicAdmin topicAdmin) {
Review Comment:
nit: indent
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1279,6 +1288,455 @@ public void
testAdminConfigsClientOverridesWithNonePolicy() {
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX);
}
+ @Test
+ public void testRegularSourceOffsetsConsumerConfigs() {
+ final Map<String, Object> connectorConsumerOverrides = new HashMap<>();
+
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides);
+
+ Map<String, String> workerProps = new HashMap<>(this.workerProps);
+ workerProps.put("exactly.once.source.support", "enabled");
+ workerProps.put("bootstrap.servers", "localhost:4761");
+ workerProps.put("group.id", "connect-cluster");
+ workerProps.put("config.storage.topic", "connect-configs");
+ workerProps.put("offset.storage.topic", "connect-offsets");
+ workerProps.put("status.storage.topic", "connect-statuses");
+ config = new DistributedConfig(workerProps);
+
+ Map<String, Object> consumerConfigs =
Worker.regularSourceOffsetsConsumerConfigs(
+ "test", "", config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:4761",
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ assertEquals("read_committed",
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+ workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG,
"localhost:9021");
+ workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
+ config = new DistributedConfig(workerProps);
+ consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+ "test", "", config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:9021",
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ // User is allowed to override the isolation level for regular
(non-exactly-once) source connectors and their tasks
+ assertEquals("read_uncommitted",
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+ workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG);
+ connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG,
"localhost:489");
+ connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
+ config = new DistributedConfig(workerProps);
+ consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+ "test", "", config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:489",
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ // User is allowed to override the isolation level for regular
(non-exactly-once) source connectors and their tasks
+ assertEquals("read_uncommitted",
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+ }
+
+ @Test
+ public void testExactlyOnceSourceOffsetsConsumerConfigs() {
+ final Map<String, Object> connectorConsumerOverrides = new HashMap<>();
+
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides);
+
+ Map<String, String> workerProps = new HashMap<>(this.workerProps);
+ workerProps.put("exactly.once.source.support", "enabled");
+ workerProps.put("bootstrap.servers", "localhost:4761");
+ workerProps.put("group.id", "connect-cluster");
+ workerProps.put("config.storage.topic", "connect-configs");
+ workerProps.put("offset.storage.topic", "connect-offsets");
+ workerProps.put("status.storage.topic", "connect-statuses");
+ config = new DistributedConfig(workerProps);
+
+ Map<String, Object> consumerConfigs =
Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+ "test", "", config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:4761",
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ assertEquals("read_committed",
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+ workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG,
"localhost:9021");
+ workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
+ config = new DistributedConfig(workerProps);
+ consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+ "test", "", config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:9021",
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ // User is not allowed to override isolation level when exactly-once
support is enabled
+ assertEquals("read_committed",
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+ workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG);
+ connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG,
"localhost:489");
+ connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
+ config = new DistributedConfig(workerProps);
+ consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+ "test", "", config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:489",
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ // User is not allowed to override isolation level when exactly-once
support is enabled
+ assertEquals("read_committed",
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+ }
+
+ @Test
+ public void testExactlyOnceSourceTaskProducerConfigs() {
+ final Map<String, Object> connectorProducerOverrides = new HashMap<>();
+
when(connectorConfig.originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(connectorProducerOverrides);
+
+ final String groupId = "connect-cluster";
+ final String transactionalId = Worker.taskTransactionalId(groupId,
TASK_ID.connector(), TASK_ID.task());
+
+ Map<String, String> workerProps = new HashMap<>(this.workerProps);
+ workerProps.put("exactly.once.source.support", "enabled");
+ workerProps.put("bootstrap.servers", "localhost:4761");
+ workerProps.put("group.id", groupId);
+ workerProps.put("config.storage.topic", "connect-configs");
+ workerProps.put("offset.storage.topic", "connect-offsets");
+ workerProps.put("status.storage.topic", "connect-statuses");
+ config = new DistributedConfig(workerProps);
+
+ Map<String, Object> producerConfigs =
Worker.exactlyOnceSourceTaskProducerConfigs(
+ TASK_ID, config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:4761",
producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG));
+ assertEquals(transactionalId,
producerConfigs.get(TRANSACTIONAL_ID_CONFIG));
+
+ workerProps.put("producer." + BOOTSTRAP_SERVERS_CONFIG,
"localhost:9021");
+ workerProps.put("producer." + ENABLE_IDEMPOTENCE_CONFIG, "false");
+ workerProps.put("producer." + TRANSACTIONAL_ID_CONFIG,
"some-other-transactional-id");
+ config = new DistributedConfig(workerProps);
+ producerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs(
+ TASK_ID, config, connectorConfig, null,
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+ assertEquals("localhost:9021",
producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+ // User is not allowed to override idempotence or transactional ID for
exactly-once source tasks
+ assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG));
+ assertEquals(transactionalId,
producerConfigs.get(TRANSACTIONAL_ID_CONFIG));
+
+ workerProps.remove("producer." + ENABLE_IDEMPOTENCE_CONFIG);
+ workerProps.remove("producer." + TRANSACTIONAL_ID_CONFIG);
+ connectorProducerOverrides.put(BOOTSTRAP_SERVERS_CONFIG,
"localhost:489");
+ connectorProducerOverrides.put(ENABLE_IDEMPOTENCE_CONFIG, "false");
+ connectorProducerOverrides.put(TRANSACTIONAL_ID_CONFIG,
"yet-another-transactional-id");
Review Comment:
Could user overrides the `transactional.id` to `null`? Should we add this
test case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]