AHeise commented on a change in pull request #16676: URL: https://github.com/apache/flink/pull/16676#discussion_r681534451
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/DefaultKafkaSinkContext.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.connector.sink.Sink; + +import org.apache.kafka.common.PartitionInfo; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * Context providing information to assist constructing a {@link + * org.apache.kafka.clients.producer.ProducerRecord}. + */ +class DefaultKafkaSinkContext implements KafkaRecordSerializationSchema.KafkaSinkContext { + + private final Sink.InitContext context; + private final FlinkKafkaInternalProducer<?, ?> producer; + + public DefaultKafkaSinkContext( + Sink.InitContext context, FlinkKafkaInternalProducer<?, ?> producer) { + this.context = context; + this.producer = producer; + } + + @Override + public int getParallelInstanceId() { + return context.getSubtaskId(); + } + + @Override + public int getNumberOfParallelInstances() { + return context.getNumberOfParallelSubtasks(); + } + + @Override + public int[] getPartitionsForTopic(String topic) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + final List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); + + return partitionsList.stream() + .sorted(Comparator.comparing(PartitionInfo::partition)) + .map(PartitionInfo::partition) + .mapToInt(Integer::intValue) + .toArray(); Review comment: Cache? If cache, please add documentation that it's cached. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/FlinkKafkaInternalProducer.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.util.Preconditions; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Properties; + +/** + * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state. + */ +class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class); + + @Nullable private final String transactionalId; + + public FlinkKafkaInternalProducer(Properties properties) { + super(properties); + this.transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return null; + } + + @Override + public void close() { + throw new UnsupportedOperationException( + "Close without timeout is now allowed because it can leave lingering Kafka threads."); + } + + @Override + public void flush() { + super.flush(); + if (transactionalId != null) { + flushNewPartitions(); + } + } + + public short getEpoch() { + Object transactionManager = getField(this, "transactionManager"); + Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch"); + return (short) getField(producerIdAndEpoch, "epoch"); + } + + public long getProducerId() { + Object transactionManager = getField(this, "transactionManager"); + Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch"); + return (long) getField(producerIdAndEpoch, "producerId"); + } + + /** + * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} + * is also adding new partitions to the transaction. flushNewPartitions method is moving this + * logic to pre-commit/flush, to make resumeTransaction simpler. Otherwise resumeTransaction + * would require to restore state of the not yet added/"in-flight" partitions. + */ + private void flushNewPartitions() { + LOG.info("Flushing new partitions"); + TransactionalRequestResult result = enqueueNewPartitions(); + Object sender = getField(this, "sender"); + invoke(sender, "wakeup"); + result.await(); + } + + /** + * Enqueues new transactions at the transaction manager and returns a {@link + * TransactionalRequestResult} that allows waiting on them. + * + * <p>If there are no new transactions we return a {@link TransactionalRequestResult} that is + * already done. + */ + private TransactionalRequestResult enqueueNewPartitions() { + Object transactionManager = getField(this, "transactionManager"); + synchronized (transactionManager) { + Object newPartitionsInTransaction = + getField(transactionManager, "newPartitionsInTransaction"); + Object newPartitionsInTransactionIsEmpty = + invoke(newPartitionsInTransaction, "isEmpty"); + TransactionalRequestResult result; + if (newPartitionsInTransactionIsEmpty instanceof Boolean + && !((Boolean) newPartitionsInTransactionIsEmpty)) { + Object txnRequestHandler = + invoke(transactionManager, "addPartitionsToTransactionHandler"); + invoke( + transactionManager, + "enqueueRequest", + new Class[] {txnRequestHandler.getClass().getSuperclass()}, + new Object[] {txnRequestHandler}); + result = + (TransactionalRequestResult) + getField( + txnRequestHandler, + txnRequestHandler.getClass().getSuperclass(), + "result"); + } else { + // we don't have an operation but this operation string is also used in + // addPartitionsToTransactionHandler. + result = new TransactionalRequestResult("AddPartitionsToTxn"); + result.done(); + } + return result; + } + } + + private static Object invoke(Object object, String methodName, Object... args) { + Class<?>[] argTypes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + argTypes[i] = args[i].getClass(); + } + return invoke(object, methodName, argTypes, args); + } + + private static Object invoke( + Object object, String methodName, Class<?>[] argTypes, Object[] args) { + try { + Method method = + object.getClass().getSuperclass().getDeclaredMethod(methodName, argTypes); + method.setAccessible(true); + return method.invoke(object, args); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } + + /** + * Gets and returns the field {@code fieldName} from the given Object {@code object} using + * reflection. + */ + private static Object getField(Object object, String fieldName) { + return getField(object, object.getClass(), fieldName); + } + + /** + * Gets and returns the field {@code fieldName} from the given Object {@code object} using + * reflection. + */ + private static Object getField(Object object, Class<?> clazz, String fieldName) { + try { + Field field = clazz.getSuperclass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(object); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } + + /** + * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously + * obtained ones, so that we can resume transaction after a restart. Implementation of this + * method is based on {@link KafkaProducer#initTransactions}. + * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630 + */ + public void resumeTransaction(long producerId, short epoch) { + Preconditions.checkState( + producerId >= 0 && epoch >= 0, + "Incorrect values for producerId %s and epoch %s", + producerId, + epoch); + LOG.info( + "Attempting to resume transaction {} with producerId {} and epoch {}", + transactionalId, + producerId, + epoch); + + Object transactionManager = getField(this, "transactionManager"); + synchronized (transactionManager) { + Object topicPartitionBookkeeper = + getField(transactionManager, "topicPartitionBookkeeper"); + + invoke( + transactionManager, + "transitionTo", + getEnum( + "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); + invoke(topicPartitionBookkeeper, "reset"); + + Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch"); + setField(producerIdAndEpoch, "producerId", producerId); + setField(producerIdAndEpoch, "epoch", epoch); + + invoke( + transactionManager, + "transitionTo", + getEnum( + "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); + + invoke( + transactionManager, + "transitionTo", + getEnum( + "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); + setField(transactionManager, "transactionStarted", true); + } + } + + /** + * Sets the field {@code fieldName} on the given Object {@code object} to {@code value} using + * reflection. + */ + private static void setField(Object object, String fieldName, Object value) { + try { + Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, value); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } + + private static Enum<?> getEnum(String enumFullName) { Review comment: Please split arguments on call-site. ########## File path: flink-connectors/flink-connector-kafka/pom.xml ########## @@ -78,6 +78,18 @@ under the License. <version>${kafka.version}</version> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> Review comment: Either exclude all or try to avoid this dependency. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaCommittable.java ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import java.util.Objects; +import java.util.Properties; + +/** + * This class holds the necessary information to construct a new {@link FlinkKafkaInternalProducer} + * to commit transactions in {@link KafkaCommitter}. + */ +class KafkaCommittable { + + private final long producerId; + private final short epoch; + private final Properties kafkaProducerConfig; Review comment: Replace with `transactionalId` and inject Properties in `createCommitter`. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/DeliveryGuarantee.java ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base; + +/** + * Semantics that can be chosen. + * <li>{@link #EXACTLY_ONCE} + * <li>{@link #AT_LEAST_ONCE} + * <li>{@link #NONE} + */ +public enum DeliveryGuarantee { + EXACTLY_ONCE, Review comment: Add JavaDoc and make clear that source+sink need to be EOS. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + + private final DeliveryGuarantee deliveryGuarantee; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Properties kafkaProducerConfig; + private final int kafkaProducerPoolSize; + + private KafkaSink( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + int kafkaProducerPoolSize, + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.deliveryGuarantee = deliveryGuarantee; + this.kafkaProducerPoolSize = kafkaProducerPoolSize; + this.kafkaProducerConfig = kafkaProducerConfig; + this.recordSerializer = recordSerializer; + } + + @Override + public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( + InitContext context, List<KafkaWriterState> states) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + kafkaProducerPoolSize, + context, + recordSerializer, + new InitContextInitializationContextAdapter( + context, metricGroup -> metricGroup.addGroup("user")), + states); + } + + @Override + public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() + throws IOException { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { + return Optional.of(new KafkaWriterStateSerializer(kafkaProducerConfig)); + } + + /** + * Builder to construct {@link KafkaSink}. + * + * @param <IN> type of the records written to Kafka + */ + public static class Builder<IN> { + + private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; + private int kafkaProducersPoolSize = 5; + + private Properties kafkaProducerConfig; + private KafkaRecordSerializationSchema<IN> recordSerializer; + + public Builder<IN> setSemantic(DeliveryGuarantee deliveryGuarantee) { Review comment: `setDeliveryGuarantee` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + + private final DeliveryGuarantee deliveryGuarantee; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Properties kafkaProducerConfig; + private final int kafkaProducerPoolSize; + + private KafkaSink( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + int kafkaProducerPoolSize, + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.deliveryGuarantee = deliveryGuarantee; + this.kafkaProducerPoolSize = kafkaProducerPoolSize; + this.kafkaProducerConfig = kafkaProducerConfig; + this.recordSerializer = recordSerializer; + } + + @Override + public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( + InitContext context, List<KafkaWriterState> states) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + kafkaProducerPoolSize, + context, + recordSerializer, + new InitContextInitializationContextAdapter( + context, metricGroup -> metricGroup.addGroup("user")), + states); + } + + @Override + public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() + throws IOException { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { + return Optional.of(new KafkaWriterStateSerializer(kafkaProducerConfig)); + } + + /** + * Builder to construct {@link KafkaSink}. + * + * @param <IN> type of the records written to Kafka + */ + public static class Builder<IN> { Review comment: Add setBootstrapServers ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + + private final DeliveryGuarantee deliveryGuarantee; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Properties kafkaProducerConfig; + private final int kafkaProducerPoolSize; + + private KafkaSink( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + int kafkaProducerPoolSize, + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.deliveryGuarantee = deliveryGuarantee; + this.kafkaProducerPoolSize = kafkaProducerPoolSize; + this.kafkaProducerConfig = kafkaProducerConfig; + this.recordSerializer = recordSerializer; + } + + @Override + public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( + InitContext context, List<KafkaWriterState> states) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + kafkaProducerPoolSize, + context, + recordSerializer, + new InitContextInitializationContextAdapter( + context, metricGroup -> metricGroup.addGroup("user")), + states); + } + + @Override + public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() + throws IOException { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { + return Optional.of(new KafkaWriterStateSerializer(kafkaProducerConfig)); + } + + /** + * Builder to construct {@link KafkaSink}. + * + * @param <IN> type of the records written to Kafka + */ + public static class Builder<IN> { + + private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; + private int kafkaProducersPoolSize = 5; + + private Properties kafkaProducerConfig; + private KafkaRecordSerializationSchema<IN> recordSerializer; + + public Builder<IN> setSemantic(DeliveryGuarantee deliveryGuarantee) { + this.deliveryGuarantee = requireNonNull(deliveryGuarantee, "semantic"); + return this; + } + + public Builder<IN> setKafkaProducerConfig(Properties kafkaProducerConfig) { + this.kafkaProducerConfig = requireNonNull(kafkaProducerConfig, "kafkaProducerConfig"); + // set the producer configuration properties for kafka record key value serializers. + if (!kafkaProducerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + kafkaProducerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ByteArraySerializer.class.getName()); + } else { + LOG.warn( + "Overwriting the '{}' is not recommended", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!kafkaProducerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + kafkaProducerConfig.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + ByteArraySerializer.class.getName()); + } else { + LOG.warn( + "Overwriting the '{}' is not recommended", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + if (!kafkaProducerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { + final long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis(); + checkState( + timeout < Integer.MAX_VALUE && timeout > 0, + "timeout does not fit into 32 bit integer"); + kafkaProducerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout); + LOG.warn( + "Property [{}] not specified. Setting it to {}", + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + DEFAULT_KAFKA_TRANSACTION_TIMEOUT); + } + return this; + } + + public Builder<IN> setRecordSerializer( + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.recordSerializer = recordSerializer; Review comment: checkNotNull ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees Review comment: Add link to builder. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + + private final DeliveryGuarantee deliveryGuarantee; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Properties kafkaProducerConfig; + private final int kafkaProducerPoolSize; + + private KafkaSink( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + int kafkaProducerPoolSize, + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.deliveryGuarantee = deliveryGuarantee; + this.kafkaProducerPoolSize = kafkaProducerPoolSize; + this.kafkaProducerConfig = kafkaProducerConfig; + this.recordSerializer = recordSerializer; + } + + @Override + public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( + InitContext context, List<KafkaWriterState> states) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + kafkaProducerPoolSize, + context, + recordSerializer, + new InitContextInitializationContextAdapter( + context, metricGroup -> metricGroup.addGroup("user")), + states); + } + + @Override + public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() + throws IOException { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { + return Optional.of(new KafkaWriterStateSerializer(kafkaProducerConfig)); + } + + /** + * Builder to construct {@link KafkaSink}. + * + * @param <IN> type of the records written to Kafka + */ + public static class Builder<IN> { + + private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; + private int kafkaProducersPoolSize = 5; + + private Properties kafkaProducerConfig; + private KafkaRecordSerializationSchema<IN> recordSerializer; + + public Builder<IN> setSemantic(DeliveryGuarantee deliveryGuarantee) { + this.deliveryGuarantee = requireNonNull(deliveryGuarantee, "semantic"); Review comment: checkNotNull ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + + private final DeliveryGuarantee deliveryGuarantee; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Properties kafkaProducerConfig; + private final int kafkaProducerPoolSize; + + private KafkaSink( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + int kafkaProducerPoolSize, + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.deliveryGuarantee = deliveryGuarantee; + this.kafkaProducerPoolSize = kafkaProducerPoolSize; + this.kafkaProducerConfig = kafkaProducerConfig; + this.recordSerializer = recordSerializer; + } + + @Override + public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( + InitContext context, List<KafkaWriterState> states) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + kafkaProducerPoolSize, + context, + recordSerializer, + new InitContextInitializationContextAdapter( + context, metricGroup -> metricGroup.addGroup("user")), + states); + } + + @Override + public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() + throws IOException { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { + return Optional.of(new KafkaWriterStateSerializer(kafkaProducerConfig)); + } + + /** + * Builder to construct {@link KafkaSink}. + * + * @param <IN> type of the records written to Kafka + */ + public static class Builder<IN> { Review comment: Pull out Builder as KafkaSinkBuilder. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { Review comment: +public ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSinkITCase.java ########## @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX; +import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for using KafkaSink writing to a Kafka cluster. */ +public class KafkaSinkITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkITCase.class); + private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG); + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final Network NETWORK = Network.newNetwork(); + private static final int ZK_TIMEOUT_MILLIS = 30000; + private static final short TOPIC_REPLICATION_FACTOR = 1; + private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1); + + private String topic; + private SharedReference<AtomicLong> emittedRecordsCount; + private SharedReference<AtomicLong> emittedRecordsWithCheckpoint; + private SharedReference<AtomicBoolean> failed; + private SharedReference<AtomicLong> lastCheckpointedRecord; + + @ClassRule + public static final KafkaContainer KAFKA_CONTAINER = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2")) + .withEmbeddedZookeeper() + .withEnv( + ImmutableMap.of( + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", + "1", + "KAFKA_TRANSACTION_MAX_TIMEOUT_MS", + String.valueOf(Duration.ofHours(2).toMillis()), + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", + "1", + "KAFKA_MIN_INSYNC_REPLICAS", + "1")) + .withNetwork(NETWORK) + .withLogConsumer(LOG_CONSUMER) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + @Rule public final TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setUp() throws ExecutionException, InterruptedException, TimeoutException { + emittedRecordsCount = sharedObjects.add(new AtomicLong()); + emittedRecordsWithCheckpoint = sharedObjects.add(new AtomicLong()); + failed = sharedObjects.add(new AtomicBoolean(false)); + lastCheckpointedRecord = sharedObjects.add(new AtomicLong(0)); + topic = UUID.randomUUID().toString(); + createTestTopic(topic, 1, TOPIC_REPLICATION_FACTOR); + } + + @After + public void tearDown() throws ExecutionException, InterruptedException, TimeoutException { + deleteTestTopic(topic); + } + + @Test + public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception { + writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount); + } + + @Test + public void testWriteRecordsToKafkaWithNoneGuarantee() throws Exception { + writeRecordsToKafka(DeliveryGuarantee.NONE, emittedRecordsCount); + } + + @Test + public void testWriteRecordsToKafkaWithExactlyOnceGuarantee() throws Exception { + writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint); + } + + @Test + public void testRecoveryWithAtLeastOnceGuarantee() throws Exception { + testRecoveryWithAssertion( + DeliveryGuarantee.AT_LEAST_ONCE, + (records) -> + assertThat(records, hasItems(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))); + } + + @Test + public void testRecoveryWithExactlyOnceGuarantee() throws Exception { + testRecoveryWithAssertion( + DeliveryGuarantee.EXACTLY_ONCE, + (records) -> + assertEquals( + records, + LongStream.range(1, lastCheckpointedRecord.get().get() + 1) + .boxed() + .collect(Collectors.toList()))); + } + + @Test + public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Exception { + // Run a first job failing during the async phase of a checkpoint to leave some + // lingering transactions + final Configuration config = new Configuration(); + config.setString(StateBackendOptions.STATE_BACKEND, "filesystem"); + final File checkpointDir = temp.newFolder(); + config.setString( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.set( + ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2); + try { + executeWithMapper(new FailAsyncCheckpointMapper(1), config, "firstPrefix"); + } catch (Exception e) { + assertThat( + e.getCause().getCause().getMessage(), + containsString("Exceeded checkpoint tolerable failure")); + } + final File completedCheckpoint = + Files.find(checkpointDir.toPath(), 2, this::isCompletedCheckpoint) + .max(Comparator.comparing(Path::toString)) + .map(Path::toFile) + .orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint")); Review comment: Extract `org.apache.flink.test.util.TestUtils` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.InitContextInitializationContextAdapter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + * described by {@link DeliveryGuarantee}. + * <li>{@link DeliveryGuarantee#NONE} is it not guaranteed that no messages are lost and in case of + * a failure messages may be duplicated. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE} TODO: FLINK-23124 + * + * <p>When creating the sink it is required to specify {@code deliveryGuarantee}, {@link + * Properties} to configure the Kafka Producer and one has to implement the {@link + * KafkaRecordSerializationSchema}. + * + * @param <IN> type of the records written to Kafka + */ +class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + + private final DeliveryGuarantee deliveryGuarantee; + private final KafkaRecordSerializationSchema<IN> recordSerializer; + private final Properties kafkaProducerConfig; + private final int kafkaProducerPoolSize; + + private KafkaSink( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + int kafkaProducerPoolSize, + KafkaRecordSerializationSchema<IN> recordSerializer) { + this.deliveryGuarantee = deliveryGuarantee; + this.kafkaProducerPoolSize = kafkaProducerPoolSize; + this.kafkaProducerConfig = kafkaProducerConfig; + this.recordSerializer = recordSerializer; + } + + @Override + public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( + InitContext context, List<KafkaWriterState> states) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + kafkaProducerPoolSize, + context, + recordSerializer, + new InitContextInitializationContextAdapter( + context, metricGroup -> metricGroup.addGroup("user")), + states); + } + + @Override + public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() + throws IOException { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { + return Optional.of(new KafkaWriterStateSerializer(kafkaProducerConfig)); + } + + /** + * Builder to construct {@link KafkaSink}. + * + * @param <IN> type of the records written to Kafka + */ + public static class Builder<IN> { Review comment: Add KafkaSink#builder() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
