This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git

commit ddf03a305ca37fa36ffe1e99f5c06d7a9f0f64d4
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Fri Apr 19 18:48:42 2024 +0200

    [FLINK-28048][connectors] Introduce Source API alternative to 
FiniteTestSource (#23777)
---
 flink-connector-hive/pom.xml                       |  8 ++++
 .../flink/connectors/hive/HiveTableSinkITCase.java | 49 +++++++++++++---------
 .../connectors/hive/HiveTableSourceITCase.java     | 40 ++++++++++--------
 3 files changed, 60 insertions(+), 37 deletions(-)

diff --git a/flink-connector-hive/pom.xml b/flink-connector-hive/pom.xml
index b0ae95df..924b0780 100644
--- a/flink-connector-hive/pom.xml
+++ b/flink-connector-hive/pom.xml
@@ -742,6 +742,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-datagen</artifactId>
+                       <version>${flink.version}</version>
+                        <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <!--flink-java and flink-clients test dependencies used for 
HiveInputFormatTest-->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 0e66923f..f84b1554 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -19,14 +19,15 @@
 package org.apache.flink.connectors.hive;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.datagen.source.TestDataGenerators;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.Expressions;
@@ -805,15 +806,21 @@ class HiveTableSinkITCase {
                                     Row.of(3, "x", "y", "2020-05-03", "9"),
                                     Row.of(4, "x", "y", "2020-05-03", "10"),
                                     Row.of(5, "x", "y", "2020-05-03", "11"));
+                    RowTypeInfo rowTypeInfo =
+                            new RowTypeInfo(
+                                    Types.INT,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING);
+
                     DataStream<Row> stream =
-                            env.addSource(
-                                    new FiniteTestSource<>(data),
-                                    new RowTypeInfo(
-                                            Types.INT,
-                                            Types.STRING,
-                                            Types.STRING,
-                                            Types.STRING,
-                                            Types.STRING));
+                            env.fromSource(
+                                    
TestDataGenerators.fromDataWithSnapshotsLatch(
+                                            data, rowTypeInfo),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "Test Source");
+
                     tEnv.createTemporaryView(
                             "my_table", stream, $("a"), $("b"), $("c"), 
$("d"), $("e"));
 
@@ -899,17 +906,21 @@ class HiveTableSinkITCase {
                                     Row.of(3, "x", "y", "2020-05-03", "9"),
                                     Row.of(4, "x", "y", "2020-05-03", "10"),
                                     Row.of(5, "x", "y", "2020-05-03", "11"));
+                    RowTypeInfo rowTypeInfo =
+                            new RowTypeInfo(
+                                    Types.INT,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING);
+
                     DataStream<Row> stream =
-                            env.addSource(
-                                    new FiniteTestSource<>(data),
-                                    new RowTypeInfo(
-                                            Types.INT,
-                                            Types.STRING,
-                                            Types.STRING,
-                                            Types.STRING,
-                                            Types.STRING));
-                    /*tEnv.createTemporaryView(
-                    "my_table", stream, $("a"), $("b"), $("c"), $("d"), 
$("e"));*/
+                            env.fromSource(
+                                    
TestDataGenerators.fromDataWithSnapshotsLatch(
+                                            data, rowTypeInfo),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "Test Source");
+
                     tEnv.createTemporaryView(
                             "my_table",
                             stream,
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index da948bb7..5b8ed9b1 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connectors.hive;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -26,11 +27,11 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.datagen.source.TestDataGenerators;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
@@ -1003,25 +1004,28 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
 
         List<Row> rows = generateRows();
         List<Row> expectedRows = generateExpectedRows(rows);
+
+        RowTypeInfo typeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {
+                            Types.INT,
+                            Types.STRING,
+                            new RowTypeInfo(
+                                    new TypeInformation[] {Types.STRING, 
Types.INT, Types.INT},
+                                    new String[] {"c1", "c2", "c3"}),
+                            new MapTypeInfo<>(Types.STRING, Types.STRING),
+                            Types.OBJECT_ARRAY(Types.STRING),
+                            Types.STRING
+                        },
+                        new String[] {"a", "b", "c", "d", "e", "f"});
+
         DataStream<Row> stream =
-                env.addSource(
-                                new FiniteTestSource<>(rows),
-                                new RowTypeInfo(
-                                        new TypeInformation[] {
-                                            Types.INT,
-                                            Types.STRING,
-                                            new RowTypeInfo(
-                                                    new TypeInformation[] {
-                                                        Types.STRING, 
Types.INT, Types.INT
-                                                    },
-                                                    new String[] {"c1", "c2", 
"c3"}),
-                                            new MapTypeInfo<>(Types.STRING, 
Types.STRING),
-                                            Types.OBJECT_ARRAY(Types.STRING),
-                                            Types.STRING
-                                        },
-                                        new String[] {"a", "b", "c", "d", "e", 
"f"}))
+                env.fromSource(
+                                
TestDataGenerators.fromDataWithSnapshotsLatch(rows, typeInfo),
+                                WatermarkStrategy.noWatermarks(),
+                                "Test Source")
                         .filter((FilterFunction<Row>) value -> true)
-                        .setParallelism(3); // to parallel tasks
+                        .setParallelism(3);
 
         tEnv.createTemporaryView("my_table", stream);
         assertResults(executeAndGetResult(tEnv), expectedRows);

Reply via email to