fapaul commented on a change in pull request #16676: URL: https://github.com/apache/flink/pull/16676#discussion_r684080102
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaWriter.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible to write records in a Kafka topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param <IN> The type of the input elements. + */ +class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class); + private static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + private static final String KAFKA_PRODUCER_METRIC_NAME = "KafkaProducer"; + + private final DeliveryGuarantee deliveryGuarantee; + private final Properties kafkaProducerConfig; + private final String transactionalIdPrefix; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Callback deliveryCallback; + private final AtomicLong pendingRecords = new AtomicLong(); + private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext; + private final List<FlinkKafkaInternalProducer<byte[], byte[]>> producers = new ArrayList<>(); + private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>(); + private final MetricGroup metricGroup; + + private transient FlinkKafkaInternalProducer<byte[], byte[]> currentProducer; + private transient KafkaWriterState kafkaWriterState; + @Nullable private transient volatile Exception producerAsyncException; + + /** + * Constructor creating a kafka writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext)} fails. + * + * @param deliveryGuarantee the Sink's delivery guarantee + * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} + * @param transactionalIdPrefix used to create the transactionalIds + * @param sinkInitContext context to provide information about the runtime environment + * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} + * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} + * @param recoveredStates state from an previous execution which was covered + */ + KafkaWriter( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + String transactionalIdPrefix, + Sink.InitContext sinkInitContext, + KafkaRecordSerializationSchema<IN> recordSerializer, + SerializationSchema.InitializationContext schemaContext, + List<KafkaWriterState> recoveredStates) { + this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee"); + this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig"); + this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); + this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer"); + try { + recordSerializer.open(schemaContext); + } catch (Exception e) { + throw new RuntimeException("Cannot initialize schema.", e); + } + this.deliveryCallback = + (metadata, exception) -> { + if (exception != null && producerAsyncException == null) { + producerAsyncException = exception; + } + acknowledgeMessage(); + }; + this.metricGroup = sinkInitContext.metricGroup(); + this.kafkaSinkContext = + new DefaultKafkaSinkContext( + checkNotNull(sinkInitContext, "sinkInitContext"), kafkaProducerConfig); + this.kafkaWriterState = + recoverAndInitializeState(checkNotNull(recoveredStates, "recoveredStates")); + this.currentProducer = beginTransaction(); + producers.add(currentProducer); + } + + @Override + public void write(IN element, Context context) throws IOException { + checkErroneous(); + final ProducerRecord<byte[], byte[]> record = + recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); + pendingRecords.incrementAndGet(); + currentProducer.send(record, deliveryCallback); + } + + @Override + public List<KafkaCommittable> prepareCommit(boolean flush) throws IOException { + flushRecords(flush); + if (!flush) { + currentProducer = beginTransaction(); + } + final List<KafkaCommittable> committables = commit(); + producers.add(currentProducer); + return committables; + } + + @Override + public List<KafkaWriterState> snapshotState() throws IOException { + return ImmutableList.of(kafkaWriterState); + } + + @Override + public void close() throws Exception { + currentProducer.close(Duration.ZERO); + } + + private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> recoveredStates) { + final int subtaskId = kafkaSinkContext.getParallelInstanceId(); + if (recoveredStates.isEmpty()) { + final KafkaWriterState state = + new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); + abortTransactions(getTransactionsToAbort(state, new ArrayList<>())); + return state; + } + final Map<Integer, KafkaWriterState> taskOffsetMapping = + recoveredStates.stream() + .collect( + Collectors.toMap( + KafkaWriterState::getSubtaskId, Function.identity())); + if (!taskOffsetMapping.containsKey(subtaskId)) { + throw new IllegalStateException( + "It is expected that state from previous executions is distributed to the same subtask id."); + } + final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId); + taskOffsetMapping.remove(subtaskId); + abortTransactions( + getTransactionsToAbort(lastState, new ArrayList<>(taskOffsetMapping.values()))); + if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { + LOG.warn( + "Transactional id prefix from previous execution {} has changed to {}.", + lastState.getTransactionalIdPrefix(), + transactionalIdPrefix); + return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); + } + return new KafkaWriterState( + transactionalIdPrefix, subtaskId, lastState.getTransactionalIdOffset()); + } + + private void abortTransactions(Set<String> transactionsToAbort) { + transactionsToAbort.forEach( + transaction -> { + // don't mess with the original configuration or any other + // properties of the + // original object + // -> create an internal kafka producer on our own and do not rely + // on + // initTransactionalProducer(). + final Properties myConfig = new Properties(); + myConfig.putAll(kafkaProducerConfig); + myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction); + LOG.info("Aborting Kafka transaction {}.", transaction); + FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null; Review comment: Hmm, currently the KafkaComitter leverages the try-with-resources logic because it is important to block until the transaction is finished. By changing this behavior we would need to change the committer try-with-resource block. I am not sure we gain much here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
