This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git
commit ddf03a305ca37fa36ffe1e99f5c06d7a9f0f64d4 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Fri Apr 19 18:48:42 2024 +0200 [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource (#23777) --- flink-connector-hive/pom.xml | 8 ++++ .../flink/connectors/hive/HiveTableSinkITCase.java | 49 +++++++++++++--------- .../connectors/hive/HiveTableSourceITCase.java | 40 ++++++++++-------- 3 files changed, 60 insertions(+), 37 deletions(-) diff --git a/flink-connector-hive/pom.xml b/flink-connector-hive/pom.xml index b0ae95df..924b0780 100644 --- a/flink-connector-hive/pom.xml +++ b/flink-connector-hive/pom.xml @@ -742,6 +742,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-datagen</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!--flink-java and flink-clients test dependencies used for HiveInputFormatTest--> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index 0e66923f..f84b1554 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -19,14 +19,15 @@ package org.apache.flink.connectors.hive; import org.apache.flink.FlinkVersion; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.connector.file.table.FileSystemConnectorOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.Expressions; @@ -805,15 +806,21 @@ class HiveTableSinkITCase { Row.of(3, "x", "y", "2020-05-03", "9"), Row.of(4, "x", "y", "2020-05-03", "10"), Row.of(5, "x", "y", "2020-05-03", "11")); + RowTypeInfo rowTypeInfo = + new RowTypeInfo( + Types.INT, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING); + DataStream<Row> stream = - env.addSource( - new FiniteTestSource<>(data), - new RowTypeInfo( - Types.INT, - Types.STRING, - Types.STRING, - Types.STRING, - Types.STRING)); + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, rowTypeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source"); + tEnv.createTemporaryView( "my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e")); @@ -899,17 +906,21 @@ class HiveTableSinkITCase { Row.of(3, "x", "y", "2020-05-03", "9"), Row.of(4, "x", "y", "2020-05-03", "10"), Row.of(5, "x", "y", "2020-05-03", "11")); + RowTypeInfo rowTypeInfo = + new RowTypeInfo( + Types.INT, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING); + DataStream<Row> stream = - env.addSource( - new FiniteTestSource<>(data), - new RowTypeInfo( - Types.INT, - Types.STRING, - Types.STRING, - Types.STRING, - Types.STRING)); - /*tEnv.createTemporaryView( - "my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"));*/ + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch( + data, rowTypeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source"); + tEnv.createTemporaryView( "my_table", stream, diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index da948bb7..5b8ed9b1 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.connectors.hive; import org.apache.flink.FlinkVersion; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -26,11 +27,11 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; @@ -1003,25 +1004,28 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { List<Row> rows = generateRows(); List<Row> expectedRows = generateExpectedRows(rows); + + RowTypeInfo typeInfo = + new RowTypeInfo( + new TypeInformation[] { + Types.INT, + Types.STRING, + new RowTypeInfo( + new TypeInformation[] {Types.STRING, Types.INT, Types.INT}, + new String[] {"c1", "c2", "c3"}), + new MapTypeInfo<>(Types.STRING, Types.STRING), + Types.OBJECT_ARRAY(Types.STRING), + Types.STRING + }, + new String[] {"a", "b", "c", "d", "e", "f"}); + DataStream<Row> stream = - env.addSource( - new FiniteTestSource<>(rows), - new RowTypeInfo( - new TypeInformation[] { - Types.INT, - Types.STRING, - new RowTypeInfo( - new TypeInformation[] { - Types.STRING, Types.INT, Types.INT - }, - new String[] {"c1", "c2", "c3"}), - new MapTypeInfo<>(Types.STRING, Types.STRING), - Types.OBJECT_ARRAY(Types.STRING), - Types.STRING - }, - new String[] {"a", "b", "c", "d", "e", "f"})) + env.fromSource( + TestDataGenerators.fromDataWithSnapshotsLatch(rows, typeInfo), + WatermarkStrategy.noWatermarks(), + "Test Source") .filter((FilterFunction<Row>) value -> true) - .setParallelism(3); // to parallel tasks + .setParallelism(3); tEnv.createTemporaryView("my_table", stream); assertResults(executeAndGetResult(tEnv), expectedRows);