Clarkkkkk commented on a change in pull request #6675: [FLINK-10258] 
[sql-client] Allow streaming sources to be present for batch executions
URL: https://github.com/apache/flink/pull/6675#discussion_r217061232
 
 

 ##########
 File path: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 ##########
 @@ -385,6 +385,55 @@ public void testStreamQueryExecutionSink() throws 
Exception {
                }
        }
 
+       @Test
+       public void testReuseStreamingCsvSourceIBatchExecution() throws 
Exception {
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_0", url.getPath());
+               replaceVars.put("$VAR_1", "/");
+               replaceVars.put("$VAR_2", "streaming");
+               replaceVars.put("$VAR_3", "table");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
+
+               final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               final String query = "SELECT scalarUDF(IntegerField1), 
StringField1 FROM TableNumber1";
+
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("47,Hello World");
+               expectedResults.add("27,Hello World");
+               expectedResults.add("37,Hello World");
+               expectedResults.add("37,Hello World");
+               expectedResults.add("47,Hello World");
+               expectedResults.add("57,Hello World!!!!");
+
+               try {
+                       ResultDescriptor desc = executor.executeQuery(session, 
query);
+
+                       assertTrue(desc.isMaterialized());
+
+                       List<String> actualResults = 
retrieveTableResult(executor, session, desc.getResultId());
+
+                       TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
+
+                       //switch to batch mode
+                       session.setSessionProperty("execution.type", "batch");
+
+                       desc = executor.executeQuery(session, query);
+
+                       assertTrue(desc.isMaterialized());
+
+                       actualResults = retrieveTableResult(executor, session, 
desc.getResultId());
+
+                       TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
+               } finally {
 
 Review comment:
   We should make sure things works well when we switch to the stream mode 
again. It should be good to add some code to test that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to