This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66cf720388c5697d5bf98c10b3dd348a0704a710
Author: Fabian Paul <[email protected]>
AuthorDate: Thu Oct 21 10:19:29 2021 +0200

    [FLINK-24596][kafka] Make passed lambdas of UpsertKafka serializable
---
 .../connectors/kafka/table/KafkaDynamicSink.java   |  3 +-
 .../connectors/kafka/table/ReducingUpsertSink.java |  6 +-
 .../kafka/table/SinkBufferFlushMode.java           |  3 +-
 .../kafka/table/ReducingUpsertWriterTest.java      |  3 +-
 .../kafka/table/UpsertKafkaTableITCase.java        | 96 ++++++++++++++++++++++
 5 files changed, 103 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index a67472b..32c343e4 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -231,7 +230,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                                                                 context,
                                                                 
dataStream.getExecutionConfig())
                                                         ::copy
-                                                : Function.identity());
+                                                : rowData -> rowData);
                         final DataStreamSink<RowData> end = 
dataStream.sinkTo(sink);
                         if (parallelism != null) {
                             end.setParallelism(parallelism);
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java
index 1cb7231..e4a479a 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.function.SerializableFunction;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Function;
 
 /**
  * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper 
{@link SinkWriter}
@@ -43,14 +43,14 @@ class ReducingUpsertSink<WriterState> implements 
Sink<RowData, Void, WriterState
     private final DataType physicalDataType;
     private final int[] keyProjection;
     private final SinkBufferFlushMode bufferFlushMode;
-    private final Function<RowData, RowData> valueCopyFunction;
+    private final SerializableFunction<RowData, RowData> valueCopyFunction;
 
     ReducingUpsertSink(
             Sink<RowData, ?, WriterState, ?> wrappedSink,
             DataType physicalDataType,
             int[] keyProjection,
             SinkBufferFlushMode bufferFlushMode,
-            Function<RowData, RowData> valueCopyFunction) {
+            SerializableFunction<RowData, RowData> valueCopyFunction) {
         this.wrappedSink = wrappedSink;
         this.physicalDataType = physicalDataType;
         this.keyProjection = keyProjection;
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java
index d5c0c2d..91a897a 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 /** Sink buffer flush configuration. */
-public class SinkBufferFlushMode {
+public class SinkBufferFlushMode implements Serializable {
 
     private static final int DISABLED_BATCH_SIZE = 0;
     private static final long DISABLED_BATCH_INTERVAL = 0L;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
index a318189..1c72cc7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
@@ -45,7 +45,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
@@ -321,7 +320,7 @@ public class ReducingUpsertWriterTest {
                 },
                 enableObjectReuse
                         ? typeInformation.createSerializer(new 
ExecutionConfig())::copy
-                        : Function.identity());
+                        : r -> r);
     }
 
     private static class MockedSinkWriter implements SinkWriter<RowData, Void, 
Void> {
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 4f9d485..9b11e0b 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.utils.LegacyRowResource;
@@ -37,6 +40,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.api.common.typeinfo.Types.INT;
+import static org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
+import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED;
+import static org.apache.flink.api.common.typeinfo.Types.STRING;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.comparedWithKeyAndOrder;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults;
@@ -99,6 +106,95 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
     }
 
     @Test
+    public void testBufferedUpsertSink() throws Exception {
+        final String topic = "buffered_upsert_topic_" + format;
+        createTestTopic(topic, 1, 1);
+        String bootstraps = getBootstrapServers();
+        env.setParallelism(1);
+
+        Table table =
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(
+                                                1,
+                                                
LocalDateTime.parse("2020-03-08T13:12:11.12"),
+                                                "payload 1"),
+                                        Row.of(
+                                                2,
+                                                
LocalDateTime.parse("2020-03-09T13:12:11.12"),
+                                                "payload 2"),
+                                        Row.of(
+                                                3,
+                                                
LocalDateTime.parse("2020-03-10T13:12:11.12"),
+                                                "payload 3"),
+                                        Row.of(
+                                                3,
+                                                
LocalDateTime.parse("2020-03-11T13:12:11.12"),
+                                                "payload"))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"k_id", "ts", 
"payload"},
+                                                INT,
+                                                LOCAL_DATE_TIME,
+                                                STRING)),
+                        Schema.newBuilder()
+                                .column("k_id", DataTypes.INT())
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .column("payload", DataTypes.STRING())
+                                .watermark("ts", "ts")
+                                .build());
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `k_id` INTEGER,\n"
+                                + "  `ts` TIMESTAMP(3),\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (k_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'key.fields-prefix' = 'k_',\n"
+                                + "  'sink.buffer-flush.max-rows' = '2',\n"
+                                + "  'sink.buffer-flush.interval' = 
'100000',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'EXCEPT_KEY'\n"
+                                + ")",
+                        topic, bootstraps, format, format);
+
+        tEnv.executeSql(createTable);
+
+        table.executeInsert("upsert_kafka").await();
+
+        final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM 
upsert_kafka"), 3);
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow(
+                                "+I",
+                                1,
+                                LocalDateTime.parse("2020-03-08T13:12:11.120"),
+                                "payload 1"),
+                        changelogRow(
+                                "+I",
+                                2,
+                                LocalDateTime.parse("2020-03-09T13:12:11.120"),
+                                "payload 2"),
+                        changelogRow(
+                                "+I",
+                                3,
+                                LocalDateTime.parse("2020-03-11T13:12:11.120"),
+                                "payload"));
+
+        assertThat(result, deepEqualTo(expected, true));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
+    @Test
     public void testSourceSinkWithKeyAndPartialValue() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.

Reply via email to