Repository: flink
Updated Branches:
  refs/heads/master 8c15d379d -> 19040a632


[FLINK-9578] [sql-client] Allow to define an auto watermark interval in SQL 
Client

This closes #6160.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19040a63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19040a63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19040a63

Branch: refs/heads/master
Commit: 19040a632c66af64d49707cdf07adb25af04d92b
Parents: 8c15d37
Author: Timo Walther <twal...@apache.org>
Authored: Wed Jun 13 16:49:00 2018 +0200
Committer: Timo Walther <twal...@apache.org>
Committed: Tue Jul 3 13:53:11 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sqlClient.md                     |  3 +-
 .../conf/sql-client-defaults.yaml               |  2 +
 .../flink/table/client/config/Execution.java    | 10 ++++
 .../table/client/config/PropertyStrings.java    |  2 +
 .../client/gateway/local/ExecutionContext.java  |  8 ++-
 .../client/gateway/local/EnvironmentTest.java   |  4 +-
 .../gateway/local/ExecutionContextTest.java     | 63 ++++++++++++++++++++
 .../gateway/local/LocalExecutorITCase.java      |  3 +-
 .../resources/test-sql-client-defaults.yaml     |  1 +
 9 files changed, 91 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/docs/dev/table/sqlClient.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 4c27356..2bdec2b 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -185,12 +185,13 @@ tables:
 
 execution:
   type: streaming                   # required: execution mode either 'batch' 
or 'streaming'
+  result-mode: table                # required: either 'table' or 'changelog'
   time-characteristic: event-time   # optional: 'processing-time' or 
'event-time' (default)
   parallelism: 1                    # optional: Flink's parallelism (1 by 
default)
+  periodic-watermarks-interval: 200 # optional: interval for periodic 
watermarks (200 ms by default)
   max-parallelism: 16               # optional: Flink's maximum parallelism 
(128 by default)
   min-idle-state-retention: 0       # optional: table program's minimum idle 
state time
   max-idle-state-retention: 0       # optional: table program's maximum idle 
state time
-  result-mode: table                # required: either 'table' or 'changelog'
 
 # Deployment properties allow for describing the cluster to which table 
programs are submitted to.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 5fd01d9..7ca7776 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -46,6 +46,8 @@ execution:
   type: streaming
   # allow 'event-time' or only 'processing-time' in sources
   time-characteristic: event-time
+  # interval in ms for emitting periodic watermarks
+  periodic-watermarks-interval: 200
   # 'changelog' or 'table' presentation of results
   result-mode: changelog
   # parallelism of the program

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 3ca99c1..9a5ae47 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -50,6 +50,12 @@ public class Execution {
                        PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING);
        }
 
+       public boolean isBatchExecution() {
+               return Objects.equals(
+                       properties.get(PropertyStrings.EXECUTION_TYPE),
+                       PropertyStrings.EXECUTION_TYPE_VALUE_BATCH);
+       }
+
        public TimeCharacteristic getTimeCharacteristic() {
                final String s = properties.getOrDefault(
                        PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
@@ -64,6 +70,10 @@ public class Execution {
                }
        }
 
+       public long getPeriodicWatermarksInterval() {
+               return 
Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_PERIODIC_WATERMARKS_INTERVAL,
 Long.toString(200L)));
+       }
+
        public long getMinStateRetention() {
                return 
Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION,
 Long.toString(Long.MIN_VALUE)));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index b7a3101..76e52de 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -41,6 +41,8 @@ public final class PropertyStrings {
 
        public static final String 
EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME = "processing-time";
 
+       public static final String EXECUTION_PERIODIC_WATERMARKS_INTERVAL = 
"periodic-watermarks-interval";
+
        public static final String EXECUTION_MIN_STATE_RETENTION = 
"min-idle-state-retention";
 
        public static final String EXECUTION_MAX_STATE_RETENTION = 
"max-idle-state-retention";

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 2d54fc8..c62faa4 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -35,6 +35,7 @@ import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.table.api.BatchQueryConfig;
@@ -193,10 +194,12 @@ public class ExecutionContext<T> {
                                streamExecEnv = 
createStreamExecutionEnvironment();
                                execEnv = null;
                                tableEnv = 
TableEnvironment.getTableEnvironment(streamExecEnv);
-                       } else {
+                       } else if (mergedEnv.getExecution().isBatchExecution()) 
{
                                streamExecEnv = null;
                                execEnv = createExecutionEnvironment();
                                tableEnv = 
TableEnvironment.getTableEnvironment(execEnv);
+                       } else {
+                               throw new SqlExecutionException("Unsupported 
execution type specified.");
                        }
 
                        // create query config
@@ -265,6 +268,9 @@ public class ExecutionContext<T> {
                        
env.setParallelism(mergedEnv.getExecution().getParallelism());
                        
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
                        
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
+                       if (env.getStreamTimeCharacteristic() == 
TimeCharacteristic.EventTime) {
+                               
env.getConfig().setAutoWatermarkInterval(mergedEnv.getExecution().getPeriodicWatermarksInterval());
+                       }
                        return env;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index 384ad4f..ef1ca88 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -52,8 +52,8 @@ public class EnvironmentTest {
                tables.add("TableNumber2");
                tables.add("NewTable");
 
-               assertEquals(merged.getTables().keySet(), tables);
+               assertEquals(tables, merged.getTables().keySet());
                assertTrue(merged.getExecution().isStreamingExecution());
-               assertEquals(merged.getExecution().getMaxParallelism(), 16);
+               assertEquals(16, merged.getExecution().getMaxParallelism());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
new file mode 100644
index 0000000..8ba88ec
--- /dev/null
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.gateway.SessionContext;
+import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+
+import org.apache.commons.cli.Options;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link ExecutionContext}.
+ */
+public class ExecutionContextTest {
+
+       private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+
+       @Test
+       public void testExecutionConfig() throws Exception {
+               final ExecutionContext<?> context = createExecutionContext();
+               final ExecutionConfig config = 
context.createEnvironmentInstance().getExecutionConfig();
+               assertEquals(99, config.getAutoWatermarkInterval());
+       }
+
+       private <T> ExecutionContext<T> createExecutionContext() throws 
Exception {
+               final Environment env = EnvironmentFileUtil.parseModified(
+                       DEFAULTS_ENVIRONMENT_FILE,
+                       Collections.singletonMap("$VAR_2", "streaming"));
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               final Configuration flinkConfig = new Configuration();
+               return new ExecutionContext<>(
+                       env,
+                       session,
+                       Collections.emptyList(),
+                       flinkConfig,
+                       new Options(),
+                       Collections.singletonList(new DefaultCLI(flinkConfig)));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index c4f282b..b78a6ac 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -123,6 +123,7 @@ public class LocalExecutorITCase extends TestLogger {
                final Map<String, String> expectedProperties = new HashMap<>();
                expectedProperties.put("execution.type", "streaming");
                expectedProperties.put("execution.time-characteristic", 
"event-time");
+               
expectedProperties.put("execution.periodic-watermarks-interval", "99");
                expectedProperties.put("execution.parallelism", "1");
                expectedProperties.put("execution.max-parallelism", "16");
                expectedProperties.put("execution.max-idle-state-retention", 
"0");
@@ -266,7 +267,7 @@ public class LocalExecutorITCase extends TestLogger {
 
        private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> 
clusterClient) throws Exception {
                return new LocalExecutor(
-                       
EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE),
+                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, 
Collections.singletonMap("$VAR_2", "batch")),
                        Collections.emptyList(),
                        clusterClient.getFlinkConfiguration(),
                        new DummyCustomCommandLine<T>(clusterClient));

http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 3c21ffb..a9b4161 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -66,6 +66,7 @@ tables:
 execution:
   type: "$VAR_2"
   time-characteristic: event-time
+  periodic-watermarks-interval: 99
   parallelism: 1
   max-parallelism: 16
   min-idle-state-retention: 0

Reply via email to