This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 66cf720388c5697d5bf98c10b3dd348a0704a710 Author: Fabian Paul <[email protected]> AuthorDate: Thu Oct 21 10:19:29 2021 +0200 [FLINK-24596][kafka] Make passed lambdas of UpsertKafka serializable --- .../connectors/kafka/table/KafkaDynamicSink.java | 3 +- .../connectors/kafka/table/ReducingUpsertSink.java | 6 +- .../kafka/table/SinkBufferFlushMode.java | 3 +- .../kafka/table/ReducingUpsertWriterTest.java | 3 +- .../kafka/table/UpsertKafkaTableITCase.java | 96 ++++++++++++++++++++++ 5 files changed, 103 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index a67472b..32c343e4 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; -import java.util.function.Function; import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -231,7 +230,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada context, dataStream.getExecutionConfig()) ::copy - : Function.identity()); + : rowData -> rowData); final DataStreamSink<RowData> end = dataStream.sinkTo(sink); if (parallelism != null) { end.setParallelism(parallelism); diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java index 1cb7231..e4a479a 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java @@ -24,11 +24,11 @@ import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.flink.util.function.SerializableFunction; import java.io.IOException; import java.util.List; import java.util.Optional; -import java.util.function.Function; /** * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper {@link SinkWriter} @@ -43,14 +43,14 @@ class ReducingUpsertSink<WriterState> implements Sink<RowData, Void, WriterState private final DataType physicalDataType; private final int[] keyProjection; private final SinkBufferFlushMode bufferFlushMode; - private final Function<RowData, RowData> valueCopyFunction; + private final SerializableFunction<RowData, RowData> valueCopyFunction; ReducingUpsertSink( Sink<RowData, ?, WriterState, ?> wrappedSink, DataType physicalDataType, int[] keyProjection, SinkBufferFlushMode bufferFlushMode, - Function<RowData, RowData> valueCopyFunction) { + SerializableFunction<RowData, RowData> valueCopyFunction) { this.wrappedSink = wrappedSink; this.physicalDataType = physicalDataType; this.keyProjection = keyProjection; diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java index d5c0c2d..91a897a 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SinkBufferFlushMode.java @@ -18,10 +18,11 @@ package org.apache.flink.streaming.connectors.kafka.table; +import java.io.Serializable; import java.util.Objects; /** Sink buffer flush configuration. */ -public class SinkBufferFlushMode { +public class SinkBufferFlushMode implements Serializable { private static final int DISABLED_BATCH_SIZE = 0; private static final long DISABLED_BATCH_INTERVAL = 0L; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java index a318189..1c72cc7 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java @@ -45,7 +45,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Function; import static org.apache.flink.types.RowKind.DELETE; import static org.apache.flink.types.RowKind.INSERT; @@ -321,7 +320,7 @@ public class ReducingUpsertWriterTest { }, enableObjectReuse ? typeInformation.createSerializer(new ExecutionConfig())::copy - : Function.identity()); + : r -> r); } private static class MockedSinkWriter implements SinkWriter<RowData, Void, Void> { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java index 4f9d485..9b11e0b 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.utils.LegacyRowResource; @@ -37,6 +40,10 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.api.common.typeinfo.Types.INT; +import static org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME; +import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED; +import static org.apache.flink.api.common.typeinfo.Types.STRING; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.comparedWithKeyAndOrder; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults; @@ -99,6 +106,95 @@ public class UpsertKafkaTableITCase extends KafkaTableTestBase { } @Test + public void testBufferedUpsertSink() throws Exception { + final String topic = "buffered_upsert_topic_" + format; + createTestTopic(topic, 1, 1); + String bootstraps = getBootstrapServers(); + env.setParallelism(1); + + Table table = + tEnv.fromDataStream( + env.fromElements( + Row.of( + 1, + LocalDateTime.parse("2020-03-08T13:12:11.12"), + "payload 1"), + Row.of( + 2, + LocalDateTime.parse("2020-03-09T13:12:11.12"), + "payload 2"), + Row.of( + 3, + LocalDateTime.parse("2020-03-10T13:12:11.12"), + "payload 3"), + Row.of( + 3, + LocalDateTime.parse("2020-03-11T13:12:11.12"), + "payload")) + .returns( + ROW_NAMED( + new String[] {"k_id", "ts", "payload"}, + INT, + LOCAL_DATE_TIME, + STRING)), + Schema.newBuilder() + .column("k_id", DataTypes.INT()) + .column("ts", DataTypes.TIMESTAMP(3)) + .column("payload", DataTypes.STRING()) + .watermark("ts", "ts") + .build()); + + final String createTable = + String.format( + "CREATE TABLE upsert_kafka (\n" + + " `k_id` INTEGER,\n" + + " `ts` TIMESTAMP(3),\n" + + " `payload` STRING,\n" + + " PRIMARY KEY (k_id) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'upsert-kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'key.format' = '%s',\n" + + " 'key.fields-prefix' = 'k_',\n" + + " 'sink.buffer-flush.max-rows' = '2',\n" + + " 'sink.buffer-flush.interval' = '100000',\n" + + " 'value.format' = '%s',\n" + + " 'value.fields-include' = 'EXCEPT_KEY'\n" + + ")", + topic, bootstraps, format, format); + + tEnv.executeSql(createTable); + + table.executeInsert("upsert_kafka").await(); + + final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM upsert_kafka"), 3); + final List<Row> expected = + Arrays.asList( + changelogRow( + "+I", + 1, + LocalDateTime.parse("2020-03-08T13:12:11.120"), + "payload 1"), + changelogRow( + "+I", + 2, + LocalDateTime.parse("2020-03-09T13:12:11.120"), + "payload 2"), + changelogRow( + "+I", + 3, + LocalDateTime.parse("2020-03-11T13:12:11.120"), + "payload")); + + assertThat(result, deepEqualTo(expected, true)); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + + @Test public void testSourceSinkWithKeyAndPartialValue() throws Exception { // we always use a different topic name for each parameterized topic, // in order to make sure the topic can be created.
