Repository: flink
Updated Branches:
  refs/heads/master 7baf7649e -> bc9982c36


[FLINK-9059] [table] Replace "sources" with "tables" in environment file

This closes #5758.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc9982c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc9982c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc9982c3

Branch: refs/heads/master
Commit: bc9982c364f54c40223d14eeb4823a882c021e7a
Parents: 7baf764
Author: Shuyi Chen <sh...@uber.com>
Authored: Thu Mar 22 23:00:00 2018 -0700
Committer: Timo Walther <twal...@apache.org>
Committed: Fri Apr 6 14:08:08 2018 +0200

----------------------------------------------------------------------
 .../conf/sql-client-defaults.yaml               |  5 ++-
 .../flink/table/client/config/Environment.java  | 47 ++++++++++++--------
 .../client/gateway/local/ExecutionContext.java  | 12 +++--
 .../resources/test-sql-client-defaults.yaml     |  4 +-
 .../test/resources/test-sql-client-factory.yaml |  3 +-
 .../table/descriptors/TableDescriptor.scala     | 40 +++++++++++++++++
 .../descriptors/TableDescriptorValidator.scala  | 43 ++++++++++++++++++
 .../descriptors/TableSourceDescriptor.scala     | 32 ++++++-------
 .../sources/TableSourceFactoryServiceTest.scala |  4 +-
 .../table/sources/TestTableSourceFactory.scala  |  4 +-
 10 files changed, 145 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 4ec64d6..5fd01d9 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -25,11 +25,12 @@
 # Table Sources
 #==============================================================================
 
-# Define table sources here. See the Table API & SQL documentation for details.
+# Define table sources and sinks here. See the Table API & SQL documentation 
for details.
 
-sources: [] # empty list
+tables: [] # empty list
 # A typical table source definition looks like:
 # - name: ...
+#   type: source
 #   connector: ...
 #   format: ...
 #   schema: ...

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index a910c49..7169fe1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptorValidator;
 
 import java.io.IOException;
 import java.net.URL;
@@ -29,7 +31,7 @@ import java.util.Map;
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment might be 
defined by default or
+ * define tables, execution, and deployment behavior. An environment might be 
defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * <p>In future versions, we might restrict the merging or enrichment of 
deployment properties to not
@@ -37,30 +39,39 @@ import java.util.Map;
  */
 public class Environment {
 
-       private Map<String, Source> sources;
+       private Map<String, TableDescriptor> tables;
 
        private Execution execution;
 
        private Deployment deployment;
 
        public Environment() {
-               this.sources = Collections.emptyMap();
+               this.tables = Collections.emptyMap();
                this.execution = new Execution();
                this.deployment = new Deployment();
        }
 
-       public Map<String, Source> getSources() {
-               return sources;
+       public Map<String, TableDescriptor> getTables() {
+               return tables;
        }
 
-       public void setSources(List<Map<String, Object>> sources) {
-               this.sources = new HashMap<>(sources.size());
-               sources.forEach(config -> {
-                       final Source s = Source.create(config);
-                       if (this.sources.containsKey(s.getName())) {
-                               throw new SqlClientException("Duplicate source 
name '" + s + "'.");
+       public void setTables(List<Map<String, Object>> tables) {
+               this.tables = new HashMap<>(tables.size());
+               tables.forEach(config -> {
+                       if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
+                               throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+                       }
+                       if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
+                               
config.remove(TableDescriptorValidator.TABLE_TYPE());
+                               final Source s = Source.create(config);
+                               if (this.tables.containsKey(s.getName())) {
+                                       throw new SqlClientException("Duplicate 
source name '" + s + "'.");
+                               }
+                               this.tables.put(s.getName(), s);
+                       } else {
+                               throw new SqlClientException(
+                                               "Invalid table 'type' attribute 
value, only 'source' is supported");
                        }
-                       this.sources.put(s.getName(), s);
                });
        }
 
@@ -102,10 +113,10 @@ public class Environment {
        public static Environment merge(Environment env1, Environment env2) {
                final Environment mergedEnv = new Environment();
 
-               // merge sources
-               final Map<String, Source> sources = new 
HashMap<>(env1.getSources());
-               mergedEnv.getSources().putAll(env2.getSources());
-               mergedEnv.sources = sources;
+               // merge tables
+               final Map<String, TableDescriptor> tables = new 
HashMap<>(env1.getTables());
+               mergedEnv.getTables().putAll(env2.getTables());
+               mergedEnv.tables = tables;
 
                // merge execution properties
                mergedEnv.execution = Execution.merge(env1.getExecution(), 
env2.getExecution());
@@ -119,8 +130,8 @@ public class Environment {
        public static Environment enrich(Environment env, Map<String, String> 
properties) {
                final Environment enrichedEnv = new Environment();
 
-               // merge sources
-               enrichedEnv.sources = new HashMap<>(env.getSources());
+               // merge tables
+               enrichedEnv.tables = new HashMap<>(env.getTables());
 
                // enrich execution properties
                enrichedEnv.execution = Execution.enrich(env.execution, 
properties);

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 81931e2..84b7b28 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.descriptors.TableSourceDescriptor;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactoryService;
 import org.apache.flink.util.FlinkException;
@@ -60,7 +61,7 @@ import java.util.Map;
 /**
  * Context for executing table programs. This class caches everything that can 
be cached across
  * multiple queries as long as the session context does not change. This must 
be thread-safe as
- * it might be reused across different query submission.
+ * it might be reused across different query submissions.
  *
  * @param <T> cluster id
  */
@@ -92,9 +93,12 @@ public class ExecutionContext<T> {
 
                // create table sources
                tableSources = new HashMap<>();
-               mergedEnv.getSources().forEach((name, source) -> {
-                       final TableSource<?> tableSource = 
TableSourceFactoryService.findAndCreateTableSource(source, classLoader);
-                       tableSources.put(name, tableSource);
+               mergedEnv.getTables().forEach((name, descriptor) -> {
+                       if (descriptor instanceof TableSourceDescriptor) {
+                               TableSource<?> tableSource = 
TableSourceFactoryService.findAndCreateTableSource(
+                                               (TableSourceDescriptor) 
descriptor, classLoader);
+                               tableSources.put(name, tableSource);
+                       }
                });
 
                // convert deployment options into command line options that 
describe a cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 8f11d23..1186615 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -23,8 +23,9 @@
 
 # this file has variables that can be filled with content by replacing $VAR_XXX
 
-sources:
+tables:
   - name: TableNumber1
+    type: source
     schema:
       - name: IntegerField1
         type: INT
@@ -43,6 +44,7 @@ sources:
       line-delimiter: "\n"
       comment-prefix: "#"
   - name: TableNumber2
+    type: source
     schema:
       - name: IntegerField2
         type: INT

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index d0caf84..c7b6097 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -23,8 +23,9 @@
 
 # this file has variables that can be filled with content by replacing $VAR_XXX
 
-sources:
+tables:
   - name: TableNumber1
+    type: source
     schema:
       - name: IntegerField1
         type: INT

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
new file mode 100644
index 0000000..7b864d8
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    connectorDescriptor.foreach(_.addProperties(properties))
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+    metaDescriptor.foreach(_.addProperties(properties))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
new file mode 100644
index 0000000..b868a8a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Validator for [[TableDescriptor]].
+  */
+class TableDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    // nothing to do
+  }
+}
+
+object TableDescriptorValidator {
+
+  /**
+    * Key for describing the type of this table, valid values are ('source').
+    */
+  val TABLE_TYPE = "type"
+
+  /**
+    * Valid TABLE_TYPE value.
+    */
+  val TABLE_TYPE_VALUE_SOURCE = "source"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
index 5118489..3ca39c2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -27,37 +27,31 @@ import scala.collection.JavaConverters._
 /**
   * Common class for all descriptors describing a table source.
   */
-abstract class TableSourceDescriptor extends Descriptor {
+abstract class TableSourceDescriptor extends TableDescriptor {
 
-  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
-  protected var formatDescriptor: Option[FormatDescriptor] = None
-  protected var schemaDescriptor: Option[Schema] = None
   protected var statisticsDescriptor: Option[Statistics] = None
-  protected var metaDescriptor: Option[Metadata] = None
 
   /**
     * Internal method for properties conversion.
     */
   override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
-    connectorDescriptor.foreach(_.addProperties(properties))
-    formatDescriptor.foreach(_.addProperties(properties))
-    schemaDescriptor.foreach(_.addProperties(properties))
-    metaDescriptor.foreach(_.addProperties(properties))
+    super.addProperties(properties)
+    statisticsDescriptor.foreach(_.addProperties(properties))
   }
 
   /**
     * Reads table statistics from the descriptors properties.
     */
   protected def getTableStats: Option[TableStats] = {
-      val normalizedProps = new DescriptorProperties()
-      addProperties(normalizedProps)
-      val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
-      rowCount match {
-        case Some(cnt) =>
-          val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
-          Some(TableStats(cnt, columnStats.asJava))
-        case None =>
-          None
-      }
+    val normalizedProps = new DescriptorProperties()
+    addProperties(normalizedProps)
+    val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+    rowCount match {
+      case Some(cnt) =>
+        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
+        Some(TableStats(cnt, columnStats.asJava))
+      case None =>
+        None
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
index 279e9a4..48db9da 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.table.api.{NoMatchingTableSourceException, 
TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_PROPERTY_VERSION}
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_PROPERTY_VERSION}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION,
 CONNECTOR_TYPE}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION,
 FORMAT_TYPE}
 import org.junit.Assert.assertTrue
 import org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
index ee3d637..b4aa08d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
@@ -22,8 +22,8 @@ import java.util
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableSchema
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_PROPERTY_VERSION}
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_PROPERTY_VERSION}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION,
 CONNECTOR_TYPE}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION,
 FORMAT_TYPE}
 import org.apache.flink.types.Row
 
 class TestTableSourceFactory extends TableSourceFactory[Row] {

Reply via email to