This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 157813eac012847b4682313b4d1ae65428c5a354 Author: Arvid Heise <[email protected]> AuthorDate: Fri Sep 27 17:14:01 2024 +0200 [FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartitioner Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a functional interface. --- .../kafka/sink/KafkaPartitioner.java} | 22 ++++----------- .../KafkaRecordSerializationSchemaBuilder.java | 20 +++++++++++-- .../kafka/partitioner/FlinkKafkaPartitioner.java | 33 +++------------------- .../DynamicKafkaRecordSerializationSchema.java | 6 ++-- .../kafka/table/KafkaConnectorOptionsUtil.java | 16 +++++------ .../connectors/kafka/table/KafkaDynamicSink.java | 6 ++-- .../kafka/table/KafkaDynamicTableFactory.java | 4 +-- 7 files changed, 43 insertions(+), 64 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java similarity index 69% copy from flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java copy to flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java index 7318ed69..19a2d473 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java @@ -15,25 +15,18 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.partitioner; +package org.apache.flink.connector.kafka.sink; import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; /** - * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of - * multiple Kafka topics. - * - * @deprecated Will be turned into internal class when {@link - * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. + * A {@code KafkaPartitioner} wraps logic on how to partition records across partitions of multiple + * Kafka topics. */ @PublicEvolving -@Deprecated -public abstract class FlinkKafkaPartitioner<T> implements Serializable { - - private static final long serialVersionUID = -9086719227828020494L; - +public interface KafkaPartitioner<T> extends Serializable { /** * Initializer for the partitioner. This is called once on each parallel sink instance of the * Flink Kafka producer. This method should be overridden if necessary. @@ -41,9 +34,7 @@ public abstract class FlinkKafkaPartitioner<T> implements Serializable { * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink * @param parallelInstances the total number of parallel instances */ - public void open(int parallelInstanceId, int parallelInstances) { - // overwrite this method if needed. - } + default void open(int parallelInstanceId, int parallelInstances) {} /** * Determine the id of the partition that the record should be written to. @@ -55,6 +46,5 @@ public abstract class FlinkKafkaPartitioner<T> implements Serializable { * @param partitions found partitions for the target topic * @return the id of the target partition */ - public abstract int partition( - T record, byte[] key, byte[] value, String targetTopic, int[] partitions); + int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 92eb625b..e9fc413b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -82,7 +82,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { @Nullable private Function<? super IN, String> topicSelector; @Nullable private SerializationSchema<? super IN> valueSerializationSchema; - @Nullable private FlinkKafkaPartitioner<? super IN> partitioner; + @Nullable private KafkaPartitioner<? super IN> partitioner; @Nullable private SerializationSchema<? super IN> keySerializationSchema; @Nullable private HeaderProvider<? super IN> headerProvider; @@ -91,6 +91,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { * * @param partitioner * @return {@code this} + * @deprecated use {@link #setPartitioner(KafkaPartitioner)} */ public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner( FlinkKafkaPartitioner<? super T> partitioner) { @@ -99,6 +100,19 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { return self; } + /** + * Sets a custom partitioner determining the target partition of the target topic. + * + * @param partitioner + * @return {@code this} + */ + public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner( + KafkaPartitioner<? super T> partitioner) { + KafkaRecordSerializationSchemaBuilder<T> self = self(); + self.partitioner = checkNotNull(partitioner); + return self; + } + /** * Sets a fixed topic which used as destination for all records. * @@ -295,7 +309,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { implements KafkaRecordSerializationSchema<IN> { private final SerializationSchema<? super IN> valueSerializationSchema; private final Function<? super IN, String> topicSelector; - private final FlinkKafkaPartitioner<? super IN> partitioner; + private final KafkaPartitioner<? super IN> partitioner; private final SerializationSchema<? super IN> keySerializationSchema; private final HeaderProvider<? super IN> headerProvider; @@ -303,7 +317,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { Function<? super IN, String> topicSelector, SerializationSchema<? super IN> valueSerializationSchema, @Nullable SerializationSchema<? super IN> keySerializationSchema, - @Nullable FlinkKafkaPartitioner<? super IN> partitioner, + @Nullable KafkaPartitioner<? super IN> partitioner, @Nullable HeaderProvider<? super IN> headerProvider) { this.topicSelector = checkNotNull(topicSelector); this.valueSerializationSchema = checkNotNull(valueSerializationSchema); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java index 7318ed69..9568349a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -18,43 +18,18 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; import org.apache.flink.annotation.PublicEvolving; - -import java.io.Serializable; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of * multiple Kafka topics. * - * @deprecated Will be turned into internal class when {@link - * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. + * @deprecated Use {@link KafkaPartitioner} instead for {@link + * org.apache.flink.connector.kafka.sink.KafkaSink}. */ @PublicEvolving @Deprecated -public abstract class FlinkKafkaPartitioner<T> implements Serializable { +public abstract class FlinkKafkaPartitioner<T> implements KafkaPartitioner<T> { private static final long serialVersionUID = -9086719227828020494L; - - /** - * Initializer for the partitioner. This is called once on each parallel sink instance of the - * Flink Kafka producer. This method should be overridden if necessary. - * - * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink - * @param parallelInstances the total number of parallel instances - */ - public void open(int parallelInstanceId, int parallelInstances) { - // overwrite this method if needed. - } - - /** - * Determine the id of the partition that the record should be written to. - * - * @param record the record value - * @param key serialized key of the record - * @param value serialized value of the record - * @param targetTopic target topic for the record - * @param partitions found partitions for the target topic - * @return the id of the target partition - */ - public abstract int partition( - T record, byte[] key, byte[] value, String targetTopic, int[] partitions); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index f3a7acb3..229b08b5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; @@ -46,7 +46,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS private final Set<String> topics; private final Pattern topicPattern; - private final FlinkKafkaPartitioner<RowData> partitioner; + private final KafkaPartitioner<RowData> partitioner; @Nullable private final SerializationSchema<RowData> keySerialization; private final SerializationSchema<RowData> valueSerialization; private final RowData.FieldGetter[] keyFieldGetters; @@ -59,7 +59,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS DynamicKafkaRecordSerializationSchema( @Nullable List<String> topics, @Nullable Pattern topicPattern, - @Nullable FlinkKafkaPartitioner<RowData> partitioner, + @Nullable KafkaPartitioner<RowData> partitioner, @Nullable SerializationSchema<RowData> keySerialization, SerializationSchema<RowData> valueSerialization, RowData.FieldGetter[] keyFieldGetters, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java index f752276a..5960a709 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java @@ -24,11 +24,11 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy; @@ -386,7 +386,7 @@ class KafkaConnectorOptionsUtil { * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class * name. */ - public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner( + public static Optional<KafkaPartitioner<RowData>> getFlinkKafkaPartitioner( ReadableConfig tableOptions, ClassLoader classLoader) { return tableOptions .getOptional(SINK_PARTITIONER) @@ -465,19 +465,19 @@ class KafkaConnectorOptionsUtil { } /** Returns a class value with the given class name. */ - private static <T> FlinkKafkaPartitioner<T> initializePartitioner( + private static <T> KafkaPartitioner<T> initializePartitioner( String name, ClassLoader classLoader) { try { Class<?> clazz = Class.forName(name, true, classLoader); - if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) { + if (!KafkaPartitioner.class.isAssignableFrom(clazz)) { throw new ValidationException( String.format( - "Sink partitioner class '%s' should extend from the required class %s", - name, FlinkKafkaPartitioner.class.getName())); + "Sink partitioner class '%s' should implement the required class %s", + name, KafkaPartitioner.class.getName())); } @SuppressWarnings("unchecked") - final FlinkKafkaPartitioner<T> kafkaPartitioner = - InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader); + final KafkaPartitioner<T> kafkaPartitioner = + InstantiationUtil.instantiate(name, KafkaPartitioner.class, classLoader); return kafkaPartitioner; } catch (ClassNotFoundException | FlinkException e) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index 8ab0f10c..2bb52c94 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -24,11 +24,11 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; @@ -125,7 +125,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada protected final Properties properties; /** Partitioner to select Kafka partition for each item. */ - protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner; + protected final @Nullable KafkaPartitioner<RowData> partitioner; /** * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message @@ -150,7 +150,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada @Nullable List<String> topics, @Nullable Pattern topicPattern, Properties properties, - @Nullable FlinkKafkaPartitioner<RowData> partitioner, + @Nullable KafkaPartitioner<RowData> partitioner, DeliveryGuarantee deliveryGuarantee, boolean upsertMode, SinkBufferFlushMode flushMode, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 7c23923b..8124691a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -26,11 +26,11 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -427,7 +427,7 @@ public class KafkaDynamicTableFactory @Nullable List<String> topics, @Nullable Pattern topicPattern, Properties properties, - FlinkKafkaPartitioner<RowData> partitioner, + KafkaPartitioner<RowData> partitioner, DeliveryGuarantee deliveryGuarantee, Integer parallelism, @Nullable String transactionalIdPrefix) {
