Repository: flink Updated Branches: refs/heads/master 8c15d379d -> 19040a632
[FLINK-9578] [sql-client] Allow to define an auto watermark interval in SQL Client This closes #6160. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19040a63 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19040a63 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19040a63 Branch: refs/heads/master Commit: 19040a632c66af64d49707cdf07adb25af04d92b Parents: 8c15d37 Author: Timo Walther <twal...@apache.org> Authored: Wed Jun 13 16:49:00 2018 +0200 Committer: Timo Walther <twal...@apache.org> Committed: Tue Jul 3 13:53:11 2018 +0200 ---------------------------------------------------------------------- docs/dev/table/sqlClient.md | 3 +- .../conf/sql-client-defaults.yaml | 2 + .../flink/table/client/config/Execution.java | 10 ++++ .../table/client/config/PropertyStrings.java | 2 + .../client/gateway/local/ExecutionContext.java | 8 ++- .../client/gateway/local/EnvironmentTest.java | 4 +- .../gateway/local/ExecutionContextTest.java | 63 ++++++++++++++++++++ .../gateway/local/LocalExecutorITCase.java | 3 +- .../resources/test-sql-client-defaults.yaml | 1 + 9 files changed, 91 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/docs/dev/table/sqlClient.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index 4c27356..2bdec2b 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -185,12 +185,13 @@ tables: execution: type: streaming # required: execution mode either 'batch' or 'streaming' + result-mode: table # required: either 'table' or 'changelog' time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default) parallelism: 1 # optional: Flink's parallelism (1 by default) + periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default) max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time - result-mode: table # required: either 'table' or 'changelog' # Deployment properties allow for describing the cluster to which table programs are submitted to. http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml index 5fd01d9..7ca7776 100644 --- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml @@ -46,6 +46,8 @@ execution: type: streaming # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time + # interval in ms for emitting periodic watermarks + periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: changelog # parallelism of the program http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java index 3ca99c1..9a5ae47 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java @@ -50,6 +50,12 @@ public class Execution { PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING); } + public boolean isBatchExecution() { + return Objects.equals( + properties.get(PropertyStrings.EXECUTION_TYPE), + PropertyStrings.EXECUTION_TYPE_VALUE_BATCH); + } + public TimeCharacteristic getTimeCharacteristic() { final String s = properties.getOrDefault( PropertyStrings.EXECUTION_TIME_CHARACTERISTIC, @@ -64,6 +70,10 @@ public class Execution { } } + public long getPeriodicWatermarksInterval() { + return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_PERIODIC_WATERMARKS_INTERVAL, Long.toString(200L))); + } + public long getMinStateRetention() { return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE))); } http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java index b7a3101..76e52de 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java @@ -41,6 +41,8 @@ public final class PropertyStrings { public static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME = "processing-time"; + public static final String EXECUTION_PERIODIC_WATERMARKS_INTERVAL = "periodic-watermarks-interval"; + public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention"; public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention"; http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 2d54fc8..c62faa4 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -35,6 +35,7 @@ import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.FlinkPlan; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.table.api.BatchQueryConfig; @@ -193,10 +194,12 @@ public class ExecutionContext<T> { streamExecEnv = createStreamExecutionEnvironment(); execEnv = null; tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv); - } else { + } else if (mergedEnv.getExecution().isBatchExecution()) { streamExecEnv = null; execEnv = createExecutionEnvironment(); tableEnv = TableEnvironment.getTableEnvironment(execEnv); + } else { + throw new SqlExecutionException("Unsupported execution type specified."); } // create query config @@ -265,6 +268,9 @@ public class ExecutionContext<T> { env.setParallelism(mergedEnv.getExecution().getParallelism()); env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism()); env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic()); + if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) { + env.getConfig().setAutoWatermarkInterval(mergedEnv.getExecution().getPeriodicWatermarksInterval()); + } return env; } http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java index 384ad4f..ef1ca88 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java @@ -52,8 +52,8 @@ public class EnvironmentTest { tables.add("TableNumber2"); tables.add("NewTable"); - assertEquals(merged.getTables().keySet(), tables); + assertEquals(tables, merged.getTables().keySet()); assertTrue(merged.getExecution().isStreamingExecution()); - assertEquals(merged.getExecution().getMaxParallelism(), 16); + assertEquals(16, merged.getExecution().getMaxParallelism()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java new file mode 100644 index 0000000..8ba88ec --- /dev/null +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.local; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.gateway.SessionContext; +import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; + +import org.apache.commons.cli.Options; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +/** + * Test for {@link ExecutionContext}. + */ +public class ExecutionContextTest { + + private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; + + @Test + public void testExecutionConfig() throws Exception { + final ExecutionContext<?> context = createExecutionContext(); + final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig(); + assertEquals(99, config.getAutoWatermarkInterval()); + } + + private <T> ExecutionContext<T> createExecutionContext() throws Exception { + final Environment env = EnvironmentFileUtil.parseModified( + DEFAULTS_ENVIRONMENT_FILE, + Collections.singletonMap("$VAR_2", "streaming")); + final SessionContext session = new SessionContext("test-session", new Environment()); + final Configuration flinkConfig = new Configuration(); + return new ExecutionContext<>( + env, + session, + Collections.emptyList(), + flinkConfig, + new Options(), + Collections.singletonList(new DefaultCLI(flinkConfig))); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index c4f282b..b78a6ac 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -123,6 +123,7 @@ public class LocalExecutorITCase extends TestLogger { final Map<String, String> expectedProperties = new HashMap<>(); expectedProperties.put("execution.type", "streaming"); expectedProperties.put("execution.time-characteristic", "event-time"); + expectedProperties.put("execution.periodic-watermarks-interval", "99"); expectedProperties.put("execution.parallelism", "1"); expectedProperties.put("execution.max-parallelism", "16"); expectedProperties.put("execution.max-idle-state-retention", "0"); @@ -266,7 +267,7 @@ public class LocalExecutorITCase extends TestLogger { private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception { return new LocalExecutor( - EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE), + EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")), Collections.emptyList(), clusterClient.getFlinkConfiguration(), new DummyCustomCommandLine<T>(clusterClient)); http://git-wip-us.apache.org/repos/asf/flink/blob/19040a63/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index 3c21ffb..a9b4161 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -66,6 +66,7 @@ tables: execution: type: "$VAR_2" time-characteristic: event-time + periodic-watermarks-interval: 99 parallelism: 1 max-parallelism: 16 min-idle-state-retention: 0