C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r888026291
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
- sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask)
workerTask);
+ sourceTaskOffsetCommitter.ifPresent(committer ->
committer.schedule(id, (WorkerSourceTask) workerTask));
}
return true;
}
}
- private WorkerTask buildWorkerTask(ClusterConfigState configState,
- ConnectorConfig connConfig,
- ConnectorTaskId id,
- Task task,
- TaskStatus.Listener statusListener,
- TargetState initialState,
- Converter keyConverter,
- Converter valueConverter,
- HeaderConverter headerConverter,
- ClassLoader loader) {
- ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
- final Class<? extends Connector> connectorClass =
plugins.connectorClass(
- connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
- RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
- connConfig.errorMaxDelayInMillis(),
connConfig.errorToleranceType(), Time.SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
- // Decide which type of worker task we need based on the type of task.
- if (task instanceof SourceTask) {
- SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
- connConfig.originalsStrings(),
config.topicCreationEnable());
- retryWithToleranceOperator.reporters(sourceTaskReporters(id,
sourceConfig, errorHandlingMetrics));
- TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformations(),
retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- CloseableOffsetStorageReader offsetReader = new
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- OffsetStorageWriter offsetWriter = new
OffsetStorageWriter(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- Map<String, Object> producerProps = producerConfigs(id,
"connector-producer-" + id, config, sourceConfig, connectorClass,
-
connectorClientConfigOverridePolicy, kafkaClusterId);
- KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
- TopicAdmin admin;
- Map<String, TopicCreationGroup> topicCreationGroups;
- if (config.topicCreationEnable() &&
sourceConfig.usesTopicCreation()) {
- Map<String, Object> adminProps = adminConfigs(id,
"connector-adminclient-" + id, config,
- sourceConfig, connectorClass,
connectorClientConfigOverridePolicy, kafkaClusterId);
- admin = new TopicAdmin(adminProps);
- topicCreationGroups =
TopicCreationGroup.configuredGroups(sourceConfig);
- } else {
- admin = null;
- topicCreationGroups = null;
- }
-
- // Note we pass the configState as it performs dynamic
transformations under the covers
- return new WorkerSourceTask(id, (SourceTask) task, statusListener,
initialState, keyConverter, valueConverter,
- headerConverter, transformationChain, producer, admin,
topicCreationGroups,
- offsetReader, offsetWriter, config, configState, metrics,
loader, time, retryWithToleranceOperator, herder.statusBackingStore(),
executor);
- } else if (task instanceof SinkTask) {
- TransformationChain<SinkRecord> transformationChain = new
TransformationChain<>(connConfig.<SinkRecord>transformations(),
retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins,
connConfig.originalsStrings());
- retryWithToleranceOperator.reporters(sinkTaskReporters(id,
sinkConfig, errorHandlingMetrics, connectorClass));
- WorkerErrantRecordReporter workerErrantRecordReporter =
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
- keyConverter, valueConverter, headerConverter);
-
- Map<String, Object> consumerProps = consumerConfigs(id, config,
connConfig, connectorClass, connectorClientConfigOverridePolicy,
kafkaClusterId);
- KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
-
- return new WorkerSinkTask(id, (SinkTask) task, statusListener,
initialState, config, configState, metrics, keyConverter,
- valueConverter, headerConverter,
transformationChain, consumer, loader, time,
- retryWithToleranceOperator,
workerErrantRecordReporter, herder.statusBackingStore());
- } else {
- log.error("Tasks must be a subclass of either SourceTask or
SinkTask and current is {}", task);
- throw new ConnectException("Tasks must be a subclass of either
SourceTask or SinkTask");
- }
+ static Map<String, Object>
exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+ WorkerConfig
config,
+ ConnectorConfig
connConfig,
+ Class<? extends
Connector> connectorClass,
+
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String
clusterId) {
+ Map<String, Object> result = baseProducerConfigs(id.connector(),
"connector-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy, clusterId);
+ ConnectUtils.ensureProperty(
+ result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+ "for connectors when exactly-once source support is enabled",
+ false
+ );
+ String transactionalId = transactionalId(config.groupId(),
id.connector(), id.task());
Review Comment:
👍 added a comment and updated the method name.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support
for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+ private static final Logger log =
LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+ private boolean transactionOpen;
+ private final LinkedHashMap<SourceRecord, RecordMetadata>
commitableRecords;
+
+ private final TransactionManager transactionManager;
+ private final TransactionMetricsGroup transactionMetrics;
+
+ private final ConnectorOffsetBackingStore offsetBackingStore;
+ private final Runnable preProducerCheck;
+ private final Runnable postProducerCheck;
+
+ public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+ SourceTask task,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ TransformationChain<SourceRecord>
transformationChain,
+ Producer<byte[], byte[]> producer,
+ TopicAdmin admin,
+ Map<String, TopicCreationGroup>
topicGroups,
+ CloseableOffsetStorageReader
offsetReader,
+ OffsetStorageWriter offsetWriter,
+ ConnectorOffsetBackingStore
offsetBackingStore,
+ WorkerConfig workerConfig,
+ ClusterConfigState configState,
+ ConnectMetrics connectMetrics,
+ ClassLoader loader,
+ Time time,
+ RetryWithToleranceOperator
retryWithToleranceOperator,
+ StatusBackingStore statusBackingStore,
+ SourceConnectorConfig sourceConfig,
+ Executor closeExecutor,
+ Runnable preProducerCheck,
+ Runnable postProducerCheck) {
+ super(id, task, statusListener, initialState, keyConverter,
valueConverter, headerConverter, transformationChain,
+ new WorkerSourceTaskContext(offsetReader, id, configState,
buildTransactionContext(sourceConfig)),
+ producer, admin, topicGroups, offsetReader, offsetWriter,
offsetBackingStore, workerConfig, connectMetrics,
+ loader, time, retryWithToleranceOperator, statusBackingStore,
closeExecutor);
+
+ this.transactionOpen = false;
+ this.commitableRecords = new LinkedHashMap<>();
+ this.offsetBackingStore = offsetBackingStore;
+
+ this.preProducerCheck = preProducerCheck;
+ this.postProducerCheck = postProducerCheck;
+
+ this.transactionManager = buildTransactionManager(workerConfig,
sourceConfig, sourceTaskContext.transactionContext());
+ this.transactionMetrics = new TransactionMetricsGroup(id,
connectMetrics);
+ }
+
+ private static WorkerTransactionContext
buildTransactionContext(SourceConnectorConfig sourceConfig) {
+ return
TransactionBoundary.CONNECTOR.equals(sourceConfig.transactionBoundary())
+ ? new WorkerTransactionContext()
+ : null;
+ }
+
+ @Override
+ protected void prepareToInitializeTask() {
+ preProducerCheck.run();
+
+ // Try not to start up the offset store (which has its own producer
and consumer) if we've already been shut down at this point
+ if (isStopping())
+ return;
+ offsetBackingStore.start();
+
+ // Try not to initialize the transactional producer (which may
accidentally fence out other, later task generations) if we've already
+ // been shut down at this point
+ if (isStopping())
+ return;
+ producer.initTransactions();
+
+ postProducerCheck.run();
+ }
+
+ @Override
+ protected void prepareToEnterSendLoop() {
+ transactionManager.initialize();
+ }
+
+ @Override
+ protected void beginSendIteration() {
+ // No-op
+ }
+
+ @Override
+ protected void prepareToPollTask() {
+ // No-op
+ }
+
+ @Override
+ protected void recordDropped(SourceRecord record) {
+ synchronized (this) {
+ commitableRecords.put(record, null);
+ }
+ transactionManager.maybeCommitTransactionForRecord(record);
+ }
+
+ @Override
+ protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord
+ ) {
+ if
(offsetBackingStore.primaryOffsetsTopic().equals(producerRecord.topic())) {
+ // This is to prevent deadlock that occurs when:
+ // 1. A task provides a record whose topic is the task's
offsets topic
+ // 2. That record is dispatched to the task's producer in a
transaction that remains open
+ // at least until the worker polls the task again
+ // 3. In the subsequent call to SourceTask::poll, the task
requests offsets from the worker
+ // (which requires a read to the end of the offsets topic,
and will block until any open
+ // transactions on the topic are either committed or
aborted)
+ throw new ConnectException("Source tasks may not produce to their
own offsets topics when exactly-once support is enabled");
+ }
+ maybeBeginTransaction();
+ return Optional.empty();
+ }
+
+ @Override
+ protected void recordDispatched(SourceRecord record) {
+ // Offsets are converted & serialized in the OffsetWriter
+ // Important: we only save offsets for the record after it has been
accepted by the producer; this way,
+ // we commit those offsets if and only if the record is sent
successfully.
+ offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
+ transactionMetrics.addRecord();
+ transactionManager.maybeCommitTransactionForRecord(record);
+ }
+
+ @Override
+ protected void batchDispatched() {
+ transactionManager.maybeCommitTransactionForBatch();
+ }
+
+ @Override
+ protected void recordSent(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ RecordMetadata recordMetadata
+ ) {
+ synchronized (this) {
+ commitableRecords.put(sourceRecord, recordMetadata);
+ }
+ }
+
+ @Override
+ protected void producerSendFailed(
+ boolean synchronous,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ SourceRecord preTransformRecord,
+ Exception e
+ ) {
+ if (synchronous) {
+ throw maybeWrapProducerSendException(
+ "Unrecoverable exception trying to send",
+ e
+ );
+ } else {
+ // No-op; all asynchronously-reported producer exceptions should
be bubbled up again by Producer::commitTransaction
+ }
+ }
+
+ @Override
+ protected void finalOffsetCommit(boolean failed) {
+ if (failed) {
+ log.debug("Skipping final offset commit as task has failed");
+ return;
+ }
+
+ // It should be safe to commit here even if we were in the middle of
retrying on RetriableExceptions in the
+ // send loop since we only track source offsets for records that have
been successfully dispatched to the
+ // producer.
+ // Any records that we were retrying on (and any records after them in
the batch) won't be included in the
+ // transaction and their offsets won't be committed, but (unless the
user has requested connector-defined
+ // transaction boundaries), it's better to commit some data than none.
+ transactionManager.maybeCommitFinalTransaction();
+ }
+
+ @Override
+ protected void onPause() {
+ super.onPause();
+ // Commit the transaction now so that we don't end up with a hanging
transaction, or worse, get fenced out
+ // and fail the task once unpaused
+ transactionManager.maybeCommitFinalTransaction();
+ }
+
+ private void maybeBeginTransaction() {
+ if (!transactionOpen) {
+ producer.beginTransaction();
+ transactionOpen = true;
+ }
+ }
+
+ private void commitTransaction() {
+ log.debug("{} Committing offsets", this);
+
+ long started = time.milliseconds();
+
+ // We might have just aborted a transaction, in which case we'll have
to begin a new one
+ // in order to commit offsets
+ maybeBeginTransaction();
+
+ AtomicReference<Throwable> flushError = new AtomicReference<>();
+ Future<Void> offsetFlush = null;
+ if (offsetWriter.beginFlush()) {
+ // Now we can actually write the offsets to the internal topic.
+ offsetFlush = offsetWriter.doFlush((error, result) -> {
+ if (error != null) {
+ log.error("{} Failed to flush offsets to storage: ",
ExactlyOnceWorkerSourceTask.this, error);
+ flushError.compareAndSet(null, error);
+ } else {
+ log.trace("{} Finished flushing offsets to storage",
ExactlyOnceWorkerSourceTask.this);
+ }
+ });
+ }
+
+ // Commit the transaction
+ // Blocks until all outstanding records have been sent and ack'd
+ try {
+ producer.commitTransaction();
+ if (offsetFlush != null) {
+ // Although it's guaranteed by the above call to
Producer::commitTransaction that all outstanding
+ // records for the task's producer (including those sent to
the offsets topic) have been delivered and
+ // ack'd, there is no guarantee that the producer callbacks
for those records have been completed. So,
+ // we add this call to Future::get to ensure that these
callbacks are invoked successfully before
+ // proceeding.
+ offsetFlush.get();
+ }
+ } catch (Throwable t) {
+ flushError.compareAndSet(null, t);
+ }
+
+ transactionOpen = false;
+
+ Throwable error = flushError.get();
+ if (error != null) {
+ recordCommitFailure(time.milliseconds() - started, null);
+ offsetWriter.cancelFlush();
+ throw maybeWrapProducerSendException(
+ "Failed to flush offsets and/or records for task " + id,
+ error
+ );
+ }
+
+ transactionMetrics.commitTransaction();
+
+ long durationMillis = time.milliseconds() - started;
+ recordCommitSuccess(durationMillis);
+ log.debug("{} Finished commitOffsets successfully in {} ms", this,
durationMillis);
+
+ // No need for any synchronization here; all other accesses to this
field take place in producer callbacks,
+ // which should all be completed by this point
Review Comment:
That's a fair point. Right now this is a result of the threading model of
the producer, which blocks on the completion of all in-flight batches and their
callbacks before sending a request to the broker to end a transaction. This is
not technically guaranteed by the documented behavior for
`Producer::commitTransaction`, though; the only guarantees we get from the
[Javadocs](https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction())
are:
> This method will flush any unsent records before actually committing the
transaction. Further, if any of the
[send(ProducerRecord)](https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord))
calls which were part of the transaction hit irrecoverable errors, this method
will throw the last received exception immediately and the transaction will not
be committed.
If we don't want to rely on this technically-undocumented behavior, we can
add some more bookkeeping to ensure this ourselves. Alternatively, we can rely
on this behavior and I can remove the comment above about producer callback
guarantees and the bookkeeping we already do on that front (which I've done for
now).
@hachikuji @guozhangwang do you have thoughts about whether it should be
part of the contract for `Producer::commitTransaction` that all user-supplied
callbacks for records within the transaction are fired before the method
returns? The use case is tracking record metadata that's only available after
the record is ack'd and then reporting it to source tasks, before clearing a
local cache of that metadata.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -784,6 +871,10 @@ private static Map<String, Object>
connectorClientConfigOverrides(ConnectorTaskI
return clientOverrides;
}
+ public static String transactionalId(String groupId, String connector, int
taskId) {
Review Comment:
Ack, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]