Repository: flink Updated Branches: refs/heads/master da3fc4fde -> 310f3de62
[FLINK-8850] [sql-client] Add support for event-time in SQL Client This closes #5683. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/310f3de6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/310f3de6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/310f3de6 Branch: refs/heads/master Commit: 310f3de62e52f1f977c217d918cc5aac79b87277 Parents: da3fc4f Author: Timo Walther <[email protected]> Authored: Mon Mar 12 14:56:32 2018 +0100 Committer: Timo Walther <[email protected]> Committed: Wed Mar 14 15:55:48 2018 +0100 ---------------------------------------------------------------------- .../conf/sql-client-defaults.yaml | 32 +++++++++++++------ .../flink/table/client/config/Execution.java | 16 ++++++++++ .../table/client/config/PropertyStrings.java | 6 ++++ .../flink/table/client/gateway/Executor.java | 3 +- .../client/gateway/local/ExecutionContext.java | 1 + .../client/gateway/local/LocalExecutor.java | 5 +-- .../table/client/gateway/local/ResultStore.java | 2 +- .../client/gateway/local/DependencyTest.java | 1 + .../gateway/local/LocalExecutorITCase.java | 1 + .../gateway/utils/TestTableSourceFactory.java | 33 ++++++++++++++++++-- .../resources/test-sql-client-defaults.yaml | 1 + .../test/resources/test-sql-client-factory.yaml | 7 +++++ .../apache/flink/table/api/TableSchema.scala | 24 ++++++++++++-- 13 files changed, 113 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml index 76ccd0c..3558422 100644 --- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml @@ -41,12 +41,20 @@ sources: [] # empty list # Execution properties allow for changing the behavior of a table program. execution: - type: streaming # 'batch' or 'streaming' execution - result-mode: changelog # 'changelog' or 'table' presentation of results - parallelism: 1 # parallelism of the program - max-parallelism: 128 # maximum parallelism - min-idle-state-retention: 0 # minimum idle state retention in ms - max-idle-state-retention: 0 # maximum idle state retention in ms + # 'batch' or 'streaming' execution + type: streaming + # allow 'event-time' or only 'processing-time' in sources + time-characteristic: event-time + # 'changelog' or 'table' presentation of results + result-mode: changelog + # parallelism of the program + parallelism: 1 + # maximum parallelism + max-parallelism: 128 + # minimum idle state retention in ms + min-idle-state-retention: 0 + # maximum idle state retention in ms + max-idle-state-retention: 0 #============================================================================== # Deployment properties @@ -56,9 +64,13 @@ execution: # programs are submitted to. deployment: - type: standalone # only the 'standalone' deployment is supported - response-timeout: 5000 # general cluster communication timeout in ms - gateway-address: "" # (optional) address from cluster to gateway - gateway-port: 0 # (optional) port from cluster to gateway + # only the 'standalone' deployment is supported + type: standalone + # general cluster communication timeout in ms + response-timeout: 5000 + # (optional) address from cluster to gateway + gateway-address: "" + # (optional) port from cluster to gateway + gateway-port: 0 http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java index 37d1a34..d84c35b 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java @@ -18,6 +18,8 @@ package org.apache.flink.table.client.config; +import org.apache.flink.streaming.api.TimeCharacteristic; + import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -54,6 +56,20 @@ public class Execution { PropertyStrings.EXECUTION_TYPE_VALUE_BATCH); } + public TimeCharacteristic getTimeCharacteristic() { + final String s = properties.getOrDefault( + PropertyStrings.EXECUTION_TIME_CHARACTERISTIC, + PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME); + switch (s) { + case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME: + return TimeCharacteristic.EventTime; + case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME: + return TimeCharacteristic.ProcessingTime; + default: + return TimeCharacteristic.EventTime; + } + } + public long getMinStateRetention() { return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE))); } http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java index ba0759d..b7a3101 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java @@ -35,6 +35,12 @@ public final class PropertyStrings { public static final String EXECUTION_TYPE_VALUE_BATCH = "batch"; + public static final String EXECUTION_TIME_CHARACTERISTIC = "time-characteristic"; + + public static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME = "event-time"; + + public static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME = "processing-time"; + public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention"; public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention"; http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index 4a41222..74e6a6b 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -46,7 +46,8 @@ public interface Executor { List<String> listTables(SessionContext session) throws SqlExecutionException; /** - * Returns the schema of a table. Throws an exception if the table could not be found. + * Returns the schema of a table. Throws an exception if the table could not be found. The + * schema might contain time attribute types for helping the user during debugging a query. */ TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException; http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 15a3c12..a013afc 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -149,6 +149,7 @@ public class ExecutionContext { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(mergedEnv.getExecution().getParallelism()); env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism()); + env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic()); return env; } http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 35d7da9..fa6c9d2 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -228,6 +228,7 @@ public class LocalExecutor implements Executor { // create table here to fail quickly for wrong queries final Table table = createTable(context, query); + final TableSchema resultSchema = table.getSchema().withoutTimeAttributes(); // deployment final ClusterClient<?> clusterClient = createDeployment(mergedEnv.getDeployment()); @@ -235,7 +236,7 @@ public class LocalExecutor implements Executor { // initialize result final DynamicResult result = resultStore.createResult( mergedEnv, - table.getSchema(), + resultSchema, context.getExecutionConfig()); // create job graph with jars @@ -275,7 +276,7 @@ public class LocalExecutor implements Executor { // start result retrieval result.startRetrieval(program); - return new ResultDescriptor(resultId, table.getSchema(), result.isMaterialized()); + return new ResultDescriptor(resultId, resultSchema, result.isMaterialized()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java index 1f3dc84..19a440e 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java @@ -55,7 +55,7 @@ public class ResultStore { } /** - * Creates a result. Might start thread or opens sockets so every creates result must be closed. + * Creates a result. Might start threads or opens sockets so every created result must be closed. */ public DynamicResult createResult(Environment env, TableSchema schema, ExecutionConfig config) { if (!env.getExecution().isStreamingExecution()) { http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 715d2db..40a1c2c 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -65,6 +65,7 @@ public class DependencyTest { final TableSchema expected = TableSchema.builder() .field("IntegerField1", Types.INT()) .field("StringField1", Types.STRING()) + .field("rowtimeField", Types.SQL_TIMESTAMP()) .build(); assertEquals(expected, result); http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index a2ae281..4536978 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -101,6 +101,7 @@ public class LocalExecutorITCase extends TestLogger { final Map<String, String> expectedProperties = new HashMap<>(); expectedProperties.put("execution.type", "streaming"); + expectedProperties.put("execution.time-characteristic", "event-time"); expectedProperties.put("execution.parallelism", "1"); expectedProperties.put("execution.max-parallelism", "16"); expectedProperties.put("execution.max-idle-state-retention", "0"); http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java index 40a7e7b..1b0a30e 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java @@ -25,6 +25,10 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.client.gateway.local.DependencyTest; import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.sources.DefinedProctimeAttribute; +import org.apache.flink.table.sources.DefinedRowtimeAttributes; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceFactory; @@ -34,8 +38,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; @@ -58,6 +65,8 @@ public class TestTableSourceFactory implements TableSourceFactory<Row> { properties.add("connector.test-property"); properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); return properties; } @@ -65,9 +74,13 @@ public class TestTableSourceFactory implements TableSourceFactory<Row> { public TableSource<Row> create(Map<String, String> properties) { final DescriptorProperties params = new DescriptorProperties(true); params.putProperties(properties); + final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params); + final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params); return new TestTableSource( params.getTableSchema(SCHEMA()), - properties.get("connector.test-property")); + properties.get("connector.test-property"), + proctime.orElse(null), + rowtime); } // -------------------------------------------------------------------------------------------- @@ -75,14 +88,18 @@ public class TestTableSourceFactory implements TableSourceFactory<Row> { /** * Test table source. */ - public static class TestTableSource implements StreamTableSource<Row> { + public static class TestTableSource implements StreamTableSource<Row>, DefinedRowtimeAttributes, DefinedProctimeAttribute { private final TableSchema schema; private final String property; + private final String proctime; + private final List<RowtimeAttributeDescriptor> rowtime; - public TestTableSource(TableSchema schema, String property) { + public TestTableSource(TableSchema schema, String property, String proctime, List<RowtimeAttributeDescriptor> rowtime) { this.schema = schema; this.property = property; + this.proctime = proctime; + this.rowtime = rowtime; } public String getProperty() { @@ -108,5 +125,15 @@ public class TestTableSourceFactory implements TableSourceFactory<Row> { public String explainSource() { return "TestTableSource"; } + + @Override + public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() { + return rowtime; + } + + @Override + public String getProctimeAttribute() { + return proctime; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index 9cbecb0..5a598f1 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -63,6 +63,7 @@ sources: execution: type: streaming + time-characteristic: event-time parallelism: 1 max-parallelism: 16 min-idle-state-retention: 0 http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml index 1bb69e5..daa1fd1 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml @@ -30,6 +30,13 @@ sources: type: INT - name: StringField1 type: VARCHAR + - name: rowtimeField + type: TIMESTAMP + rowtime: + timestamps: + type: from-source + watermarks: + type: from-source connector: type: "$VAR_0" $VAR_1: "$VAR_2" http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala index 6958b3d..6389b55 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.CompositeType import _root_.scala.collection.mutable.ArrayBuffer import _root_.java.util.Objects +import org.apache.flink.table.calcite.FlinkTypeFactory + /** * A TableSchema represents a Table's structure. */ @@ -30,6 +32,8 @@ class TableSchema( private val columnNames: Array[String], private val columnTypes: Array[TypeInformation[_]]) { + private val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap + if (columnNames.length != columnTypes.length) { throw new TableException( s"Number of field names and field types must be equal.\n" + @@ -52,8 +56,6 @@ class TableSchema( s"List of all fields: ${columnNames.mkString("[", ", ", "]")}.") } - val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap - /** * Returns a deep copy of the TableSchema. */ @@ -115,6 +117,24 @@ class TableSchema( } } + /** + * Converts a table schema into a schema that represents the result that would be written + * into a table sink or operator outside of the Table & SQL API. Time attributes are replaced + * by proper TIMESTAMP data types. + * + * @return a table schema with no time attributes + */ + def withoutTimeAttributes: TableSchema = { + val converted = columnTypes.map { t => + if (FlinkTypeFactory.isTimeIndicatorType(t)) { + Types.SQL_TIMESTAMP + } else { + t + } + } + new TableSchema(columnNames, converted) + } + override def toString: String = { val builder = new StringBuilder builder.append("root\n")
