sjwiesman commented on a change in pull request #16793:
URL: https://github.com/apache/flink/pull/16793#discussion_r687923616



##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -501,10 +527,87 @@ public void processElement(
         config.setLocalTimeZone(originalZone);
     }
 
+    @Test
+    public void testComplexUnifiedPipelineBatch() throws Exception {
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        final Table resultTable = getComplexUnifiedPipeline(env);
+
+        testResult(resultTable.execute(), Row.of("Bob", 1L), Row.of("Alice", 
1L));
+    }
+
+    @Test
+    public void testComplexUnifiedPipelineStreaming() throws Exception {
+        final Table resultTable = getComplexUnifiedPipeline(env);
+
+        // more rows than in batch mode due to incremental computations
+        testResult(
+                resultTable.execute(),
+                Row.of("Bob", 1L),
+                Row.of("Bob", 2L),
+                Row.of("Bob", 3L),
+                Row.of("Alice", 1L));
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helper methods
     // 
--------------------------------------------------------------------------------------------
 
+    private Table getComplexUnifiedPipeline(StreamExecutionEnvironment env) {
+
+        final DataStream<String> allowedNamesStream = env.fromElements("Bob", 
"Alice");
+
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        tableEnv.createTemporaryView(
+                "AllowedNamesTable", 
tableEnv.fromDataStream(allowedNamesStream).as("allowedName"));
+
+        final Table nameCountTable =
+                tableEnv.sqlQuery(
+                        "SELECT name, COUNT(*) AS c "
+                                + "FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) "
+                                + "WHERE name IN (SELECT allowedName FROM 
AllowedNamesTable)"
+                                + "GROUP BY name");
+
+        final DataStream<Row> nameCountStream = 
tableEnv.toChangelogStream(nameCountTable);

Review comment:
        I noticed you are not checking the `RowKind` in the process function. 
In batch, will we only see the final result for each key? Do we consider this 
DataStream a set of "final inserts"? Will this method ever output `DELETE`'s? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to