This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7cd97b02d7f39b5190d27fbaf6e6287f127b9a9 Author: Fabian Paul <[email protected]> AuthorDate: Thu Oct 21 10:20:25 2021 +0200 [FLINK-24596][table] Allow using unified Sinks with the DataStreamSinkProvider --- .../connectors/cassandra/CassandraSink.java | 2 +- .../flink/streaming/api/datastream/DataStream.java | 2 +- .../streaming/api/datastream/DataStreamSink.java | 8 ++- .../streaming/api/datastream/KeyedStream.java | 4 +- .../api/operators/collect/CollectStreamSink.java | 8 ++- .../transformations/LegacySinkTransformation.java | 5 +- .../LegacySinkTransformationTranslator.java | 2 +- .../api/datastream/DataStreamSinkTest.java | 10 ++- .../plan/nodes/exec/batch/BatchExecLegacySink.java | 3 +- .../nodes/exec/common/CommonExecLegacySink.java | 18 +++--- .../nodes/exec/stream/StreamExecLegacySink.java | 3 +- .../nodes/exec/common/CommonExecSinkITCase.java | 75 +++++++++++++++++++--- 12 files changed, 105 insertions(+), 35 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 89aa10a..bf44f0a 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -62,7 +62,7 @@ public class CassandraSink<IN> { } private LegacySinkTransformation<IN> getSinkTransformation() { - return sink1.getTransformation(); + return sink1.getLegacyTransformation(); } private Transformation<IN> getTransformation() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 726728b..1c1bca6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1244,7 +1244,7 @@ public class DataStream<T> { DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); - getExecutionEnvironment().addOperator(sink.getTransformation()); + getExecutionEnvironment().addOperator(sink.getLegacyTransformation()); return sink; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index eea988e..3aca069 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; @@ -64,7 +65,12 @@ public class DataStreamSink<T> { /** Returns the transformation that contains the actual sink operator of this sink. */ @Internal - public LegacySinkTransformation<T> getTransformation() { + public Transformation<T> getTransformation() { + return transformation; + } + + @Internal + public LegacySinkTransformation<T> getLegacyTransformation() { if (transformation instanceof LegacySinkTransformation) { return (LegacySinkTransformation<T>) transformation; } else { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index c299a68..ada3a82 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -299,8 +299,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { @Override public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { DataStreamSink<T> result = super.addSink(sinkFunction); - result.getTransformation().setStateKeySelector(keySelector); - result.getTransformation().setStateKeyType(keyType); + result.getLegacyTransformation().setStateKeySelector(keySelector); + result.getLegacyTransformation().setStateKeyType(keyType); return result; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java index d53c3e6..a68043d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java @@ -18,10 +18,12 @@ package org.apache.flink.streaming.api.operators.collect; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; /** * A {@link DataStreamSink} which is used to collect results of a data stream. It completely @@ -30,17 +32,17 @@ import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; @Internal public class CollectStreamSink<T> extends DataStreamSink<T> { - private final LegacySinkTransformation<T> transformation; + private final PhysicalTransformation<T> transformation; public CollectStreamSink(DataStream<T> inputStream, CollectSinkOperatorFactory<T> factory) { super(inputStream, (CollectSinkOperator<T>) factory.getOperator()); this.transformation = - new LegacySinkTransformation<>( + new LegacySinkTransformation<T>( inputStream.getTransformation(), "Collect Stream Sink", factory, 1); } @Override - public LegacySinkTransformation<T> getTransformation() { + public Transformation<T> getTransformation() { return transformation; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java index d240329..8977c70 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java @@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -40,7 +39,7 @@ import java.util.List; * @param <T> The type of the elements in the input {@code LegacySinkTransformation} */ @Internal -public class LegacySinkTransformation<T> extends PhysicalTransformation<Object> { +public class LegacySinkTransformation<T> extends PhysicalTransformation<T> { private final Transformation<T> input; @@ -70,7 +69,7 @@ public class LegacySinkTransformation<T> extends PhysicalTransformation<Object> String name, StreamOperatorFactory<Object> operatorFactory, int parallelism) { - super(name, TypeExtractor.getForClass(Object.class), parallelism); + super(name, input.getOutputType(), parallelism); this.input = input; this.operatorFactory = operatorFactory; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java index fa2b1d3..378a717 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java @@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkState; */ @Internal public class LegacySinkTransformationTranslator<IN> - extends SimpleTransformationTranslator<Object, LegacySinkTransformation<IN>> { + extends SimpleTransformationTranslator<IN, LegacySinkTransformation<IN>> { @Override protected Collection<Integer> translateForBatchInternal( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java index b8d39ad..b2de771 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java @@ -17,18 +17,24 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.runtime.operators.sink.TestSink; import org.junit.Test; +import static org.junit.Assert.assertTrue; + /** Unit test for {@link DataStreamSink}. */ public class DataStreamSinkTest { - @Test(expected = IllegalStateException.class) + @Test public void throwExceptionWhenGettingTransformationWithNewSinkAPI() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build()).getTransformation(); + final Transformation<Integer> transformation = + env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build()).getTransformation(); + assertTrue(transformation instanceof SinkTransformation); } @Test(expected = UnsupportedOperationException.class) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java index 5aa1dea..049920c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java @@ -37,8 +37,7 @@ import java.lang.reflect.Modifier; * * @param <T> The return type of the {@link TableSink}. */ -public class BatchExecLegacySink<T> extends CommonExecLegacySink<T> - implements BatchExecNode<Object> { +public class BatchExecLegacySink<T> extends CommonExecLegacySink<T> implements BatchExecNode<T> { public BatchExecLegacySink( TableSink<T> tableSink, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java index 8cb2b66..ca29944 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java @@ -58,8 +58,8 @@ import java.util.List; * * @param <T> The return type of the {@link TableSink}. */ -public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> - implements MultipleTransformationTranslator<Object> { +public abstract class CommonExecLegacySink<T> extends ExecNodeBase<T> + implements MultipleTransformationTranslator<T> { protected final TableSink<T> tableSink; protected final @Nullable String[] upsertKeys; protected final boolean needRetraction; @@ -82,7 +82,7 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> @SuppressWarnings("unchecked") @Override - protected Transformation<Object> translateToPlanInternal(PlannerBase planner) { + protected Transformation<T> translateToPlanInternal(PlannerBase planner) { if (tableSink instanceof StreamTableSink) { final Transformation<T> transform; if (tableSink instanceof RetractStreamTableSink) { @@ -121,8 +121,9 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> } final DataStream<T> dataStream = new DataStream<T>(planner.getExecEnv(), transform); - final DataStreamSink<?> dsSink = - ((StreamTableSink<T>) tableSink).consumeDataStream(dataStream); + final DataStreamSink<T> dsSink = + (DataStreamSink<T>) + ((StreamTableSink<T>) tableSink).consumeDataStream(dataStream); if (dsSink == null) { throw new TableException( String.format( @@ -131,15 +132,14 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> + "However, %s doesn't implement this method.", tableSink.getClass().getCanonicalName())); } - return dsSink.getTransformation(); + return dsSink.getLegacyTransformation(); } else if (tableSink instanceof DataStreamTableSink) { // In case of table to DataStream through // StreamTableEnvironment#toAppendStream/toRetractStream, // we insert a DataStreamTableSink that wraps the given DataStream as a LogicalSink. It // is no real table sink, so we just need translate its input to Transformation. - return (Transformation<Object>) - translateToTransformation( - planner, ((DataStreamTableSink<T>) tableSink).withChangeFlag()); + return translateToTransformation( + planner, ((DataStreamTableSink<T>) tableSink).withChangeFlag()); } else { throw new TableException( String.format( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java index af9ee1d..b1e4ee6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java @@ -40,8 +40,7 @@ import java.util.stream.Collectors; * * @param <T> The return type of the {@link TableSink}. */ -public class StreamExecLegacySink<T> extends CommonExecLegacySink<T> - implements StreamExecNode<Object> { +public class StreamExecLegacySink<T> extends CommonExecLegacySink<T> implements StreamExecNode<T> { public StreamExecLegacySink( TableSink<T> tableSink, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java index 744ced7..60c489b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java @@ -50,7 +50,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import static org.apache.flink.table.api.DataTypes.INT; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.not; @@ -93,7 +95,8 @@ public class CommonExecSinkITCase extends AbstractTestBase { DynamicTableSink.Context context) { return SinkProvider.of( TestSink.newBuilder() - .setWriter(new TestWriter(timestamps)) + .setWriter( + new TestTimestampWriter(timestamps)) .setCommittableSerializer( TestSink.StringCommittableSerializer .INSTANCE) @@ -136,7 +139,7 @@ public class CommonExecSinkITCase extends AbstractTestBase { public void invoke( RowData value, Context context) { - addTimestamp( + addElement( timestamps, context.timestamp()); } @@ -153,6 +156,35 @@ public class CommonExecSinkITCase extends AbstractTestBase { } @Test + public void testUnifiedSinksAreUsableWithDataStreamSinkProvider() + throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final SharedReference<List<RowData>> fetched = sharedObjects.add(new ArrayList<>()); + final List<Row> rows = Arrays.asList(Row.of(1), Row.of(2)); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(Schema.newBuilder().column("a", INT()).build()) + .source(new TimestampTestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DataStreamSinkProvider getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return buildRecordTestSinkProvider(fetched); + } + }) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + String sqlStmt = "INSERT INTO T1 SELECT * FROM T1"; + tableEnv.executeSql(sqlStmt).await(); + final List<Integer> fetchedRows = + fetched.get().stream().map(r -> r.getInt(0)).sorted().collect(Collectors.toList()); + assertEquals(fetchedRows.get(0).intValue(), 1); + assertEquals(fetchedRows.get(1).intValue(), 2); + } + + @Test public void testStreamRecordTimestampInserterNotApplied() { final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>()); @@ -175,7 +207,8 @@ public class CommonExecSinkITCase extends AbstractTestBase { DynamicTableSink.Context context) { return SinkProvider.of( TestSink.newBuilder() - .setWriter(new TestWriter(timestamps)) + .setWriter( + new TestTimestampWriter(timestamps)) .setCommittableSerializer( TestSink.StringCommittableSerializer .INSTANCE) @@ -187,8 +220,19 @@ public class CommonExecSinkITCase extends AbstractTestBase { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } - private static void addTimestamp(SharedReference<List<Long>> timestamps, Long timestamp) { - timestamps.applySync(l -> l.add(timestamp)); + private static DataStreamSinkProvider buildRecordTestSinkProvider( + SharedReference<List<RowData>> fetched) { + return dataStream -> + dataStream.sinkTo( + TestSink.newBuilder() + .setWriter(new RecordWriter(fetched)) + .setCommittableSerializer( + TestSink.StringCommittableSerializer.INSTANCE) + .build()); + } + + private static <T> void addElement(SharedReference<List<T>> elements, T element) { + elements.applySync(l -> l.add(element)); } private static void assertPlan( @@ -264,17 +308,32 @@ public class CommonExecSinkITCase extends AbstractTestBase { public void cancel() {} } - private static class TestWriter extends TestSink.DefaultSinkWriter<RowData> { + private static class TestTimestampWriter extends TestSink.DefaultSinkWriter<RowData> { private final SharedReference<List<Long>> timestamps; - private TestWriter(SharedReference<List<Long>> timestamps) { + private TestTimestampWriter(SharedReference<List<Long>> timestamps) { this.timestamps = timestamps; } @Override public void write(RowData element, Context context) { - addTimestamp(timestamps, context.timestamp()); + addElement(timestamps, context.timestamp()); + super.write(element, context); + } + } + + private static class RecordWriter extends TestSink.DefaultSinkWriter<RowData> { + + private final SharedReference<List<RowData>> rows; + + private RecordWriter(SharedReference<List<RowData>> rows) { + this.rows = rows; + } + + @Override + public void write(RowData element, Context context) { + addElement(rows, element); super.write(element, context); } }
