twalthr commented on a change in pull request #16793:
URL: https://github.com/apache/flink/pull/16793#discussion_r688288828
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -501,10 +527,87 @@ public void processElement(
config.setLocalTimeZone(originalZone);
}
+ @Test
+ public void testComplexUnifiedPipelineBatch() throws Exception {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ final Table resultTable = getComplexUnifiedPipeline(env);
+
+ testResult(resultTable.execute(), Row.of("Bob", 1L), Row.of("Alice",
1L));
+ }
+
+ @Test
+ public void testComplexUnifiedPipelineStreaming() throws Exception {
+ final Table resultTable = getComplexUnifiedPipeline(env);
+
+ // more rows than in batch mode due to incremental computations
+ testResult(
+ resultTable.execute(),
+ Row.of("Bob", 1L),
+ Row.of("Bob", 2L),
+ Row.of("Bob", 3L),
+ Row.of("Alice", 1L));
+ }
+
//
--------------------------------------------------------------------------------------------
// Helper methods
//
--------------------------------------------------------------------------------------------
+ private Table getComplexUnifiedPipeline(StreamExecutionEnvironment env) {
+
+ final DataStream<String> allowedNamesStream = env.fromElements("Bob",
"Alice");
+
+ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
+
+ tableEnv.createTemporaryView(
+ "AllowedNamesTable",
tableEnv.fromDataStream(allowedNamesStream).as("allowedName"));
+
+ final Table nameCountTable =
+ tableEnv.sqlQuery(
+ "SELECT name, COUNT(*) AS c "
+ + "FROM (VALUES ('Bob'), ('Alice'), ('Greg'),
('Bob')) AS NameTable(name) "
+ + "WHERE name IN (SELECT allowedName FROM
AllowedNamesTable)"
+ + "GROUP BY name");
+
+ final DataStream<Row> nameCountStream =
tableEnv.toChangelogStream(nameCountTable);
Review comment:
This is more of a testing example. It doesn't really serve a purpose.
I'm just counting the changes that come out of the Table API to have mixed
stateful operators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]