This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 157813eac012847b4682313b4d1ae65428c5a354
Author: Arvid Heise <[email protected]>
AuthorDate: Fri Sep 27 17:14:01 2024 +0200

    [FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartitioner
    
    Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a 
functional interface.
---
 .../kafka/sink/KafkaPartitioner.java}              | 22 ++++-----------
 .../KafkaRecordSerializationSchemaBuilder.java     | 20 +++++++++++--
 .../kafka/partitioner/FlinkKafkaPartitioner.java   | 33 +++-------------------
 .../DynamicKafkaRecordSerializationSchema.java     |  6 ++--
 .../kafka/table/KafkaConnectorOptionsUtil.java     | 16 +++++------
 .../connectors/kafka/table/KafkaDynamicSink.java   |  6 ++--
 .../kafka/table/KafkaDynamicTableFactory.java      |  4 +--
 7 files changed, 43 insertions(+), 64 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java
similarity index 69%
copy from 
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
copy to 
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java
index 7318ed69..19a2d473 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaPartitioner.java
@@ -15,25 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.kafka.partitioner;
+package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 
 /**
- * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records 
across partitions of
- * multiple Kafka topics.
- *
- * @deprecated Will be turned into internal class when {@link
- *     org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is 
removed.
+ * A {@code KafkaPartitioner} wraps logic on how to partition records across 
partitions of multiple
+ * Kafka topics.
  */
 @PublicEvolving
-@Deprecated
-public abstract class FlinkKafkaPartitioner<T> implements Serializable {
-
-    private static final long serialVersionUID = -9086719227828020494L;
-
+public interface KafkaPartitioner<T> extends Serializable {
     /**
      * Initializer for the partitioner. This is called once on each parallel 
sink instance of the
      * Flink Kafka producer. This method should be overridden if necessary.
@@ -41,9 +34,7 @@ public abstract class FlinkKafkaPartitioner<T> implements 
Serializable {
      * @param parallelInstanceId 0-indexed id of the parallel sink instance in 
Flink
      * @param parallelInstances the total number of parallel instances
      */
-    public void open(int parallelInstanceId, int parallelInstances) {
-        // overwrite this method if needed.
-    }
+    default void open(int parallelInstanceId, int parallelInstances) {}
 
     /**
      * Determine the id of the partition that the record should be written to.
@@ -55,6 +46,5 @@ public abstract class FlinkKafkaPartitioner<T> implements 
Serializable {
      * @param partitions found partitions for the target topic
      * @return the id of the target partition
      */
-    public abstract int partition(
-            T record, byte[] key, byte[] value, String targetTopic, int[] 
partitions);
+    int partition(T record, byte[] key, byte[] value, String targetTopic, 
int[] partitions);
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 92eb625b..e9fc413b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -82,7 +82,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
 
     @Nullable private Function<? super IN, String> topicSelector;
     @Nullable private SerializationSchema<? super IN> valueSerializationSchema;
-    @Nullable private FlinkKafkaPartitioner<? super IN> partitioner;
+    @Nullable private KafkaPartitioner<? super IN> partitioner;
     @Nullable private SerializationSchema<? super IN> keySerializationSchema;
     @Nullable private HeaderProvider<? super IN> headerProvider;
 
@@ -91,6 +91,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
      *
      * @param partitioner
      * @return {@code this}
+     * @deprecated use {@link #setPartitioner(KafkaPartitioner)}
      */
     public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> 
setPartitioner(
             FlinkKafkaPartitioner<? super T> partitioner) {
@@ -99,6 +100,19 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
         return self;
     }
 
+    /**
+     * Sets a custom partitioner determining the target partition of the 
target topic.
+     *
+     * @param partitioner
+     * @return {@code this}
+     */
+    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> 
setPartitioner(
+            KafkaPartitioner<? super T> partitioner) {
+        KafkaRecordSerializationSchemaBuilder<T> self = self();
+        self.partitioner = checkNotNull(partitioner);
+        return self;
+    }
+
     /**
      * Sets a fixed topic which used as destination for all records.
      *
@@ -295,7 +309,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
             implements KafkaRecordSerializationSchema<IN> {
         private final SerializationSchema<? super IN> valueSerializationSchema;
         private final Function<? super IN, String> topicSelector;
-        private final FlinkKafkaPartitioner<? super IN> partitioner;
+        private final KafkaPartitioner<? super IN> partitioner;
         private final SerializationSchema<? super IN> keySerializationSchema;
         private final HeaderProvider<? super IN> headerProvider;
 
@@ -303,7 +317,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
                 Function<? super IN, String> topicSelector,
                 SerializationSchema<? super IN> valueSerializationSchema,
                 @Nullable SerializationSchema<? super IN> 
keySerializationSchema,
-                @Nullable FlinkKafkaPartitioner<? super IN> partitioner,
+                @Nullable KafkaPartitioner<? super IN> partitioner,
                 @Nullable HeaderProvider<? super IN> headerProvider) {
             this.topicSelector = checkNotNull(topicSelector);
             this.valueSerializationSchema = 
checkNotNull(valueSerializationSchema);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index 7318ed69..9568349a 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -18,43 +18,18 @@
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
+import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 
 /**
  * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records 
across partitions of
  * multiple Kafka topics.
  *
- * @deprecated Will be turned into internal class when {@link
- *     org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is 
removed.
+ * @deprecated Use {@link KafkaPartitioner} instead for {@link
+ *     org.apache.flink.connector.kafka.sink.KafkaSink}.
  */
 @PublicEvolving
 @Deprecated
-public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+public abstract class FlinkKafkaPartitioner<T> implements KafkaPartitioner<T> {
 
     private static final long serialVersionUID = -9086719227828020494L;
-
-    /**
-     * Initializer for the partitioner. This is called once on each parallel 
sink instance of the
-     * Flink Kafka producer. This method should be overridden if necessary.
-     *
-     * @param parallelInstanceId 0-indexed id of the parallel sink instance in 
Flink
-     * @param parallelInstances the total number of parallel instances
-     */
-    public void open(int parallelInstanceId, int parallelInstances) {
-        // overwrite this method if needed.
-    }
-
-    /**
-     * Determine the id of the partition that the record should be written to.
-     *
-     * @param record the record value
-     * @param key serialized key of the record
-     * @param value serialized value of the record
-     * @param targetTopic target topic for the record
-     * @param partitions found partitions for the target topic
-     * @return the id of the target partition
-     */
-    public abstract int partition(
-            T record, byte[] key, byte[] value, String targetTopic, int[] 
partitions);
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
index f3a7acb3..229b08b5 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
@@ -46,7 +46,7 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
 
     private final Set<String> topics;
     private final Pattern topicPattern;
-    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final KafkaPartitioner<RowData> partitioner;
     @Nullable private final SerializationSchema<RowData> keySerialization;
     private final SerializationSchema<RowData> valueSerialization;
     private final RowData.FieldGetter[] keyFieldGetters;
@@ -59,7 +59,7 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
     DynamicKafkaRecordSerializationSchema(
             @Nullable List<String> topics,
             @Nullable Pattern topicPattern,
-            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable KafkaPartitioner<RowData> partitioner,
             @Nullable SerializationSchema<RowData> keySerialization,
             SerializationSchema<RowData> valueSerialization,
             RowData.FieldGetter[] keyFieldGetters,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index f752276a..5960a709 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -24,11 +24,11 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
 import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy;
@@ -386,7 +386,7 @@ class KafkaConnectorOptionsUtil {
      * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class
      * name.
      */
-    public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(
+    public static Optional<KafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
             ReadableConfig tableOptions, ClassLoader classLoader) {
         return tableOptions
                 .getOptional(SINK_PARTITIONER)
@@ -465,19 +465,19 @@ class KafkaConnectorOptionsUtil {
     }
 
     /** Returns a class value with the given class name. */
-    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
+    private static <T> KafkaPartitioner<T> initializePartitioner(
             String name, ClassLoader classLoader) {
         try {
             Class<?> clazz = Class.forName(name, true, classLoader);
-            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+            if (!KafkaPartitioner.class.isAssignableFrom(clazz)) {
                 throw new ValidationException(
                         String.format(
-                                "Sink partitioner class '%s' should extend 
from the required class %s",
-                                name, FlinkKafkaPartitioner.class.getName()));
+                                "Sink partitioner class '%s' should implement 
the required class %s",
+                                name, KafkaPartitioner.class.getName()));
             }
             @SuppressWarnings("unchecked")
-            final FlinkKafkaPartitioner<T> kafkaPartitioner =
-                    InstantiationUtil.instantiate(name, 
FlinkKafkaPartitioner.class, classLoader);
+            final KafkaPartitioner<T> kafkaPartitioner =
+                    InstantiationUtil.instantiate(name, 
KafkaPartitioner.class, classLoader);
 
             return kafkaPartitioner;
         } catch (ClassNotFoundException | FlinkException e) {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 8ab0f10c..2bb52c94 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -24,11 +24,11 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
@@ -125,7 +125,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
     protected final Properties properties;
 
     /** Partitioner to select Kafka partition for each item. */
-    protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+    protected final @Nullable KafkaPartitioner<RowData> partitioner;
 
     /**
      * Flag to determine sink mode. In upsert mode sink transforms the 
delete/update-before message
@@ -150,7 +150,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
             @Nullable List<String> topics,
             @Nullable Pattern topicPattern,
             Properties properties,
-            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable KafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
             boolean upsertMode,
             SinkBufferFlushMode flushMode,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 7c23923b..8124691a 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -26,11 +26,11 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -427,7 +427,7 @@ public class KafkaDynamicTableFactory
             @Nullable List<String> topics,
             @Nullable Pattern topicPattern,
             Properties properties,
-            FlinkKafkaPartitioner<RowData> partitioner,
+            KafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
             Integer parallelism,
             @Nullable String transactionalIdPrefix) {

Reply via email to