Repository: flink
Updated Branches:
  refs/heads/master da3fc4fde -> 310f3de62


[FLINK-8850] [sql-client] Add support for event-time in SQL Client

This closes #5683.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/310f3de6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/310f3de6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/310f3de6

Branch: refs/heads/master
Commit: 310f3de62e52f1f977c217d918cc5aac79b87277
Parents: da3fc4f
Author: Timo Walther <[email protected]>
Authored: Mon Mar 12 14:56:32 2018 +0100
Committer: Timo Walther <[email protected]>
Committed: Wed Mar 14 15:55:48 2018 +0100

----------------------------------------------------------------------
 .../conf/sql-client-defaults.yaml               | 32 +++++++++++++------
 .../flink/table/client/config/Execution.java    | 16 ++++++++++
 .../table/client/config/PropertyStrings.java    |  6 ++++
 .../flink/table/client/gateway/Executor.java    |  3 +-
 .../client/gateway/local/ExecutionContext.java  |  1 +
 .../client/gateway/local/LocalExecutor.java     |  5 +--
 .../table/client/gateway/local/ResultStore.java |  2 +-
 .../client/gateway/local/DependencyTest.java    |  1 +
 .../gateway/local/LocalExecutorITCase.java      |  1 +
 .../gateway/utils/TestTableSourceFactory.java   | 33 ++++++++++++++++++--
 .../resources/test-sql-client-defaults.yaml     |  1 +
 .../test/resources/test-sql-client-factory.yaml |  7 +++++
 .../apache/flink/table/api/TableSchema.scala    | 24 ++++++++++++--
 13 files changed, 113 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 76ccd0c..3558422 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -41,12 +41,20 @@ sources: [] # empty list
 # Execution properties allow for changing the behavior of a table program.
 
 execution:
-  type: streaming              # 'batch' or 'streaming' execution
-  result-mode: changelog       # 'changelog' or 'table' presentation of results
-  parallelism: 1               # parallelism of the program
-  max-parallelism: 128         # maximum parallelism
-  min-idle-state-retention: 0  # minimum idle state retention in ms
-  max-idle-state-retention: 0  # maximum idle state retention in ms
+  # 'batch' or 'streaming' execution
+  type: streaming
+  # allow 'event-time' or only 'processing-time' in sources
+  time-characteristic: event-time
+  # 'changelog' or 'table' presentation of results
+  result-mode: changelog
+  # parallelism of the program
+  parallelism: 1
+  # maximum parallelism
+  max-parallelism: 128
+  # minimum idle state retention in ms
+  min-idle-state-retention: 0
+  # maximum idle state retention in ms
+  max-idle-state-retention: 0
 
 #==============================================================================
 # Deployment properties
@@ -56,9 +64,13 @@ execution:
 # programs are submitted to.
 
 deployment:
-  type: standalone             # only the 'standalone' deployment is supported
-  response-timeout: 5000       # general cluster communication timeout in ms
-  gateway-address: ""          # (optional) address from cluster to gateway
-  gateway-port: 0              # (optional) port from cluster to gateway
+  # only the 'standalone' deployment is supported
+  type: standalone
+  # general cluster communication timeout in ms
+  response-timeout: 5000
+  # (optional) address from cluster to gateway
+  gateway-address: ""
+  # (optional) port from cluster to gateway
+  gateway-port: 0
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 37d1a34..d84c35b 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.config;
 
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -54,6 +56,20 @@ public class Execution {
                        PropertyStrings.EXECUTION_TYPE_VALUE_BATCH);
        }
 
+       public TimeCharacteristic getTimeCharacteristic() {
+               final String s = properties.getOrDefault(
+                       PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
+                       
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME);
+               switch (s) {
+                       case 
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
+                               return TimeCharacteristic.EventTime;
+                       case 
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
+                               return TimeCharacteristic.ProcessingTime;
+                       default:
+                               return TimeCharacteristic.EventTime;
+               }
+       }
+
        public long getMinStateRetention() {
                return 
Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION,
 Long.toString(Long.MIN_VALUE)));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index ba0759d..b7a3101 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -35,6 +35,12 @@ public final class PropertyStrings {
 
        public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
 
+       public static final String EXECUTION_TIME_CHARACTERISTIC = 
"time-characteristic";
+
+       public static final String 
EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME = "event-time";
+
+       public static final String 
EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME = "processing-time";
+
        public static final String EXECUTION_MIN_STATE_RETENTION = 
"min-idle-state-retention";
 
        public static final String EXECUTION_MAX_STATE_RETENTION = 
"max-idle-state-retention";

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 4a41222..74e6a6b 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -46,7 +46,8 @@ public interface Executor {
        List<String> listTables(SessionContext session) throws 
SqlExecutionException;
 
        /**
-        * Returns the schema of a table. Throws an exception if the table 
could not be found.
+        * Returns the schema of a table. Throws an exception if the table 
could not be found. The
+        * schema might contain time attribute types for helping the user 
during debugging a query.
         */
        TableSchema getTableSchema(SessionContext session, String name) throws 
SqlExecutionException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 15a3c12..a013afc 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -149,6 +149,7 @@ public class ExecutionContext {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(mergedEnv.getExecution().getParallelism());
                
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
+               
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
                return env;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 35d7da9..fa6c9d2 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -228,6 +228,7 @@ public class LocalExecutor implements Executor {
 
                // create table here to fail quickly for wrong queries
                final Table table = createTable(context, query);
+               final TableSchema resultSchema = 
table.getSchema().withoutTimeAttributes();
 
                // deployment
                final ClusterClient<?> clusterClient = 
createDeployment(mergedEnv.getDeployment());
@@ -235,7 +236,7 @@ public class LocalExecutor implements Executor {
                // initialize result
                final DynamicResult result = resultStore.createResult(
                        mergedEnv,
-                       table.getSchema(),
+                       resultSchema,
                        context.getExecutionConfig());
 
                // create job graph with jars
@@ -275,7 +276,7 @@ public class LocalExecutor implements Executor {
                // start result retrieval
                result.startRetrieval(program);
 
-               return new ResultDescriptor(resultId, table.getSchema(), 
result.isMaterialized());
+               return new ResultDescriptor(resultId, resultSchema, 
result.isMaterialized());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index 1f3dc84..19a440e 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -55,7 +55,7 @@ public class ResultStore {
        }
 
        /**
-        * Creates a result. Might start thread or opens sockets so every 
creates result must be closed.
+        * Creates a result. Might start threads or opens sockets so every 
created result must be closed.
         */
        public DynamicResult createResult(Environment env, TableSchema schema, 
ExecutionConfig config) {
                if (!env.getExecution().isStreamingExecution()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 715d2db..40a1c2c 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -65,6 +65,7 @@ public class DependencyTest {
                final TableSchema expected = TableSchema.builder()
                        .field("IntegerField1", Types.INT())
                        .field("StringField1", Types.STRING())
+                       .field("rowtimeField", Types.SQL_TIMESTAMP())
                        .build();
 
                assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index a2ae281..4536978 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -101,6 +101,7 @@ public class LocalExecutorITCase extends TestLogger {
 
                final Map<String, String> expectedProperties = new HashMap<>();
                expectedProperties.put("execution.type", "streaming");
+               expectedProperties.put("execution.time-characteristic", 
"event-time");
                expectedProperties.put("execution.parallelism", "1");
                expectedProperties.put("execution.max-parallelism", "16");
                expectedProperties.put("execution.max-idle-state-retention", 
"0");

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
index 40a7e7b..1b0a30e 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
@@ -25,6 +25,10 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.client.gateway.local.DependencyTest;
 import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.sources.DefinedProctimeAttribute;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactory;
@@ -34,8 +38,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
@@ -58,6 +65,8 @@ public class TestTableSourceFactory implements 
TableSourceFactory<Row> {
                properties.add("connector.test-property");
                properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
                properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
                return properties;
        }
 
@@ -65,9 +74,13 @@ public class TestTableSourceFactory implements 
TableSourceFactory<Row> {
        public TableSource<Row> create(Map<String, String> properties) {
                final DescriptorProperties params = new 
DescriptorProperties(true);
                params.putProperties(properties);
+               final Optional<String> proctime = 
SchemaValidator.deriveProctimeAttribute(params);
+               final List<RowtimeAttributeDescriptor> rowtime = 
SchemaValidator.deriveRowtimeAttributes(params);
                return new TestTableSource(
                        params.getTableSchema(SCHEMA()),
-                       properties.get("connector.test-property"));
+                       properties.get("connector.test-property"),
+                       proctime.orElse(null),
+                       rowtime);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -75,14 +88,18 @@ public class TestTableSourceFactory implements 
TableSourceFactory<Row> {
        /**
         * Test table source.
         */
-       public static class TestTableSource implements StreamTableSource<Row> {
+       public static class TestTableSource implements StreamTableSource<Row>, 
DefinedRowtimeAttributes, DefinedProctimeAttribute {
 
                private final TableSchema schema;
                private final String property;
+               private final String proctime;
+               private final List<RowtimeAttributeDescriptor> rowtime;
 
-               public TestTableSource(TableSchema schema, String property) {
+               public TestTableSource(TableSchema schema, String property, 
String proctime, List<RowtimeAttributeDescriptor> rowtime) {
                        this.schema = schema;
                        this.property = property;
+                       this.proctime = proctime;
+                       this.rowtime = rowtime;
                }
 
                public String getProperty() {
@@ -108,5 +125,15 @@ public class TestTableSourceFactory implements 
TableSourceFactory<Row> {
                public String explainSource() {
                        return "TestTableSource";
                }
+
+               @Override
+               public List<RowtimeAttributeDescriptor> 
getRowtimeAttributeDescriptors() {
+                       return rowtime;
+               }
+
+               @Override
+               public String getProctimeAttribute() {
+                       return proctime;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 9cbecb0..5a598f1 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -63,6 +63,7 @@ sources:
 
 execution:
   type: streaming
+  time-characteristic: event-time
   parallelism: 1
   max-parallelism: 16
   min-idle-state-retention: 0

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index 1bb69e5..daa1fd1 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -30,6 +30,13 @@ sources:
         type: INT
       - name: StringField1
         type: VARCHAR
+      - name: rowtimeField
+        type: TIMESTAMP
+        rowtime:
+          timestamps:
+            type: from-source
+          watermarks:
+            type: from-source
     connector:
       type: "$VAR_0"
       $VAR_1: "$VAR_2"

http://git-wip-us.apache.org/repos/asf/flink/blob/310f3de6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 6958b3d..6389b55 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import _root_.scala.collection.mutable.ArrayBuffer
 import _root_.java.util.Objects
 
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
 /**
   * A TableSchema represents a Table's structure.
   */
@@ -30,6 +32,8 @@ class TableSchema(
   private val columnNames: Array[String],
   private val columnTypes: Array[TypeInformation[_]]) {
 
+  private val columnNameToIndex: Map[String, Int] = 
columnNames.zipWithIndex.toMap
+
   if (columnNames.length != columnTypes.length) {
     throw new TableException(
       s"Number of field names and field types must be equal.\n" +
@@ -52,8 +56,6 @@ class TableSchema(
         s"List of all fields: ${columnNames.mkString("[", ", ", "]")}.")
   }
 
-  val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
-
   /**
     * Returns a deep copy of the TableSchema.
     */
@@ -115,6 +117,24 @@ class TableSchema(
     }
   }
 
+  /**
+    * Converts a table schema into a schema that represents the result that 
would be written
+    * into a table sink or operator outside of the Table & SQL API. Time 
attributes are replaced
+    * by proper TIMESTAMP data types.
+    *
+    * @return a table schema with no time attributes
+    */
+  def withoutTimeAttributes: TableSchema = {
+    val converted = columnTypes.map { t =>
+      if (FlinkTypeFactory.isTimeIndicatorType(t)) {
+        Types.SQL_TIMESTAMP
+      } else {
+        t
+      }
+    }
+    new TableSchema(columnNames, converted)
+  }
+
   override def toString: String = {
     val builder = new StringBuilder
     builder.append("root\n")

Reply via email to