[FLINK-8240] [table] Create unified interfaces to configure and instatiate 
TableSources

This closes #5240.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb58960
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb58960
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb58960

Branch: refs/heads/master
Commit: 2cb58960e78bee83d29fa66c2c29647353194f75
Parents: a5476cd
Author: twalthr <[email protected]>
Authored: Fri Dec 15 10:18:20 2017 +0100
Committer: twalthr <[email protected]>
Committed: Wed Jan 31 12:05:43 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaJsonTableSource.java  |   3 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   1 -
 .../flink/table/annotation/TableType.java       |   6 +-
 ...pache.flink.table.sources.TableSourceFactory |  16 +
 .../flink/table/api/BatchTableEnvironment.scala |  24 +-
 .../table/api/StreamTableEnvironment.scala      |  24 +-
 .../flink/table/api/TableEnvironment.scala      |  30 +-
 .../org/apache/flink/table/api/exceptions.scala |  49 ++
 .../table/api/java/BatchTableEnvironment.scala  |   2 +-
 .../table/catalog/ExternalCatalogSchema.scala   |  12 +-
 .../table/catalog/ExternalCatalogTable.scala    | 290 ++++++++++-
 .../table/catalog/ExternalTableSourceUtil.scala | 112 +++--
 .../table/catalog/TableSourceConverter.scala    |   4 +
 .../BatchTableSourceDescriptor.scala            |  72 +++
 .../table/descriptors/ConnectorDescriptor.scala |  54 ++
 .../ConnectorDescriptorValidator.scala          |  39 ++
 .../apache/flink/table/descriptors/Csv.scala    | 166 +++++++
 .../flink/table/descriptors/CsvValidator.scala  |  53 ++
 .../flink/table/descriptors/Descriptor.scala    |  32 ++
 .../descriptors/DescriptorProperties.scala      | 489 +++++++++++++++++++
 .../table/descriptors/DescriptorValidator.scala |  32 ++
 .../flink/table/descriptors/FileSystem.scala    |  60 +++
 .../table/descriptors/FileSystemValidator.scala |  41 ++
 .../table/descriptors/FormatDescriptor.scala    |  49 ++
 .../descriptors/FormatDescriptorValidator.scala |  39 ++
 .../apache/flink/table/descriptors/Json.scala   |  78 +++
 .../flink/table/descriptors/JsonValidator.scala |  41 ++
 .../flink/table/descriptors/Metadata.scala      |  81 +++
 .../table/descriptors/MetadataValidator.scala   |  43 ++
 .../flink/table/descriptors/Rowtime.scala       | 133 +++++
 .../table/descriptors/RowtimeValidator.scala    | 134 +++++
 .../apache/flink/table/descriptors/Schema.scala | 164 +++++++
 .../table/descriptors/SchemaValidator.scala     |  80 +++
 .../flink/table/descriptors/Statistics.scala    | 157 ++++++
 .../table/descriptors/StatisticsValidator.scala | 119 +++++
 .../StreamTableSourceDescriptor.scala           |  75 +++
 .../descriptors/TableSourceDescriptor.scala     |  75 +++
 .../flink/table/plan/stats/FlinkStatistic.scala |   2 +-
 .../flink/table/sources/CsvTableSource.scala    |   2 +-
 .../table/sources/CsvTableSourceConverter.scala |   4 +
 .../table/sources/CsvTableSourceFactory.scala   | 113 +++++
 .../table/sources/TableSourceFactory.scala      |  76 +++
 .../sources/TableSourceFactoryService.scala     | 144 ++++++
 .../tsextractors/TimestampExtractor.scala       |   2 +-
 .../wmstrategies/AscendingTimestamps.scala      |   4 +-
 .../BoundedOutOfOrderTimestamps.scala           |   4 +-
 .../wmstrategies/watermarkStrategies.scala      |   2 +-
 .../flink/table/typeutils/TypeStringUtils.scala | 252 ++++++++++
 ...pache.flink.table.sources.TableSourceFactory |  16 +
 .../flink/table/api/TableSourceTest.scala       |  38 ++
 .../catalog/ExternalCatalogSchemaTest.scala     |   4 +-
 .../catalog/ExternalTableSourceUtilTest.scala   |   6 +-
 .../flink/table/descriptors/CsvTest.scala       | 102 ++++
 .../table/descriptors/DescriptorTestBase.scala  |  54 ++
 .../table/descriptors/FileSystemTest.scala      |  53 ++
 .../flink/table/descriptors/JsonTest.scala      |  77 +++
 .../flink/table/descriptors/MetadataTest.scala  |  55 +++
 .../flink/table/descriptors/RowtimeTest.scala   |  72 +++
 .../flink/table/descriptors/SchemaTest.scala    |  76 +++
 .../table/descriptors/StatisticsTest.scala      |  91 ++++
 .../StreamTableSourceDescriptorTest.scala       |  79 +++
 .../sources/TableSourceFactoryServiceTest.scala |  82 ++++
 .../table/sources/TestTableSourceFactory.scala  |  60 +++
 .../table/typeutils/TypeStringUtilsTest.scala   | 104 ++++
 .../table/utils/MockTableEnvironment.scala      |   7 +-
 65 files changed, 4383 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 7de7f34..b3b545e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -161,7 +161,8 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource implements D
                /**
                 * Sets flag whether to fail if a field is missing or not.
                 *
-                * @param failOnMissingField If set to true, the TableSource 
fails if a missing fields.
+                * @param failOnMissingField If set to true, the TableSource 
fails if there is a missing
+                *                           field.
                 *                           If set to false, a missing field 
is set to null.
                 * @return The builder.
                 */

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 211b7ef..7f41f5d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -268,7 +268,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                return new PojoTypeComparatorBuilder();
        }
 
-       // used for testing. Maybe use mockito here
        @PublicEvolving
        public PojoField getPojoFieldAt(int pos) {
                if (pos < 0 || pos >= this.fields.length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 2d2a7af..c564528 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.annotation;
 
-import org.apache.flink.annotation.Public;
 import org.apache.flink.table.catalog.TableSourceConverter;
 
 import java.lang.annotation.Documented;
@@ -29,11 +28,14 @@ import java.lang.annotation.Target;
 
 /**
  * Annotates a table type of a {@link TableSourceConverter}.
+ *
+ * @deprecated Use the more generic 
[[org.apache.flink.table.sources.TableSourceFactory]] interface
+ * with Java service loaders instead.
  */
 @Documented
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
-@Public
+@Deprecated
 public @interface TableType {
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
 
b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..ff43eed
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.CsvTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c920d23..05255fd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -86,17 +86,20 @@ abstract class BatchTableEnvironment(
   }
 
   /** Returns a unique table name according to the internal naming pattern. */
-  protected def createUniqueTableName(): String = "_DataSetTable_" + 
nameCntr.getAndIncrement()
+  override protected def createUniqueTableName(): String =
+    "_DataSetTable_" + nameCntr.getAndIncrement()
 
   /**
-    * Registers an external [[BatchTableSource]] in this 
[[TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
+    * Registers an internal [[BatchTableSource]] in this 
[[TableEnvironment]]'s catalog without
+    * name checking. Registered tables can be referenced in SQL queries.
     *
     * @param name        The name under which the [[TableSource]] is 
registered.
     * @param tableSource The [[TableSource]] to register.
     */
-  override def registerTableSource(name: String, tableSource: TableSource[_]): 
Unit = {
-    checkValidTableName(name)
+  override protected def registerTableSourceInternal(
+      name: String,
+      tableSource: TableSource[_])
+    : Unit = {
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
@@ -107,6 +110,17 @@ abstract class BatchTableEnvironment(
     }
   }
 
+// TODO expose this once we have enough table source factories that can deal 
with it
+//  /**
+//    * Creates a table from a descriptor that describes the source connector, 
source encoding,
+//    * the resulting table schema, and other properties.
+//    *
+//    * @param connectorDescriptor connector descriptor describing the source 
of the table
+//    */
+//  def from(connectorDescriptor: ConnectorDescriptor): 
BatchTableSourceDescriptor = {
+//    new BatchTableSourceDescriptor(this, connectorDescriptor)
+//  }
+
   /**
     * Registers an external [[TableSink]] with given field names and types in 
this
     * [[TableEnvironment]]'s catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 9d94f54..84d7240 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -97,17 +97,20 @@ abstract class StreamTableEnvironment(
   }
 
   /** Returns a unique table name according to the internal naming pattern. */
-  protected def createUniqueTableName(): String = "_DataStreamTable_" + 
nameCntr.getAndIncrement()
+  override protected def createUniqueTableName(): String =
+    "_DataStreamTable_" + nameCntr.getAndIncrement()
 
   /**
-    * Registers an external [[StreamTableSource]] in this 
[[TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
+    * Registers an internal [[StreamTableSource]] in this 
[[TableEnvironment]]'s catalog without
+    * name checking. Registered tables can be referenced in SQL queries.
     *
     * @param name        The name under which the [[TableSource]] is 
registered.
     * @param tableSource The [[TableSource]] to register.
     */
-  override def registerTableSource(name: String, tableSource: TableSource[_]): 
Unit = {
-    checkValidTableName(name)
+  override protected def registerTableSourceInternal(
+      name: String,
+      tableSource: TableSource[_])
+    : Unit = {
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
@@ -125,6 +128,17 @@ abstract class StreamTableEnvironment(
     }
   }
 
+// TODO expose this once we have enough table source factories that can deal 
with it
+//  /**
+//    * Creates a table from a descriptor that describes the source connector, 
source encoding,
+//    * the resulting table schema, and other properties.
+//    *
+//    * @param connectorDescriptor connector descriptor describing the source 
of the table
+//    */
+//  def from(connectorDescriptor: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+//    new StreamTableSourceDescriptor(this, connectorDescriptor)
+//  }
+
   /**
     * Registers an external [[TableSink]] with given field names and types in 
this
     * [[TableEnvironment]]'s catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2569cd4..3b314e9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -290,6 +290,17 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Creates a table from a table source.
+    *
+    * @param source table source used as table
+    */
+  def fromTableSource(source: TableSource[_]): Table = {
+    val name = createUniqueTableName()
+    registerTableSourceInternal(name, source)
+    scan(name)
+  }
+
+  /**
     * Registers an [[ExternalCatalog]] under a unique name in the 
TableEnvironment's schema.
     * All tables registered in the [[ExternalCatalog]] can be accessed.
     *
@@ -302,7 +313,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
     this.externalCatalogs.put(name, externalCatalog)
     // create an external catalog Calcite schema, register it on the root 
schema
-    ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+    ExternalCatalogSchema.registerCatalog(this, rootSchema, name, 
externalCatalog)
   }
 
   /**
@@ -422,7 +433,19 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param name        The name under which the [[TableSource]] is 
registered.
     * @param tableSource The [[TableSource]] to register.
     */
-  def registerTableSource(name: String, tableSource: TableSource[_]): Unit
+  def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
+    checkValidTableName(name)
+    registerTableSourceInternal(name, tableSource)
+  }
+
+  /**
+    * Registers an internal [[TableSource]] in this [[TableEnvironment]]'s 
catalog without
+    * name checking. Registered tables can be referenced in SQL queries.
+    *
+    * @param name        The name under which the [[TableSource]] is 
registered.
+    * @param tableSource The [[TableSource]] to register.
+    */
+  protected def registerTableSourceInternal(name: String, tableSource: 
TableSource[_]): Unit
 
   /**
     * Registers an external [[TableSink]] with given field names and types in 
this
@@ -714,6 +737,9 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
   }
 
+  /** Returns a unique table name according to the internal naming pattern. */
+  protected def createUniqueTableName(): String
+
   /**
     * Checks if the chosen table name is valid.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 7ea17fa..f5a713a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -39,6 +39,9 @@ case class SqlParserException(
 
 /**
   * General Exception for all errors during table handling.
+  *
+  * This exception indicates that an internal error occurred or that a feature 
is not supported
+  * yet. Usually, this exception does not indicate a fault of the user.
   */
 case class TableException(
     msg: String,
@@ -55,6 +58,8 @@ object TableException {
 
 /**
   * Exception for all errors occurring during validation phase.
+  *
+  * This exception indicates that the user did something wrong.
   */
 case class ValidationException(
     msg: String,
@@ -137,11 +142,51 @@ case class CatalogAlreadyExistException(
 }
 
 /**
+  * Exception for not finding a 
[[org.apache.flink.table.sources.TableSourceFactory]] for the
+  * given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class NoMatchingTableSourceException(
+    properties: Map[String, String],
+    cause: Throwable)
+    extends RuntimeException(
+      s"Could not find a table source factory in the classpath satisfying the 
" +
+        s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 
).mkString("\n")}",
+      cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
+  * Exception for finding more than one 
[[org.apache.flink.table.sources.TableSourceFactory]] for
+  * the given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class AmbiguousTableSourceException(
+    properties: Map[String, String],
+    cause: Throwable)
+    extends RuntimeException(
+      s"More than one table source factory in the classpath satisfying the " +
+        s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 
).mkString("\n")}",
+      cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
   * Exception for not finding a [[TableSourceConverter]] for a given table 
type.
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead
+  *            (see [[org.apache.flink.table.sources.TableSourceFactory]]).
   */
+@Deprecated
+@deprecated("Use table factories (see TableSourceFactory) instead.")
 case class NoMatchedTableSourceConverterException(
     tableType: String,
     cause: Throwable)
@@ -156,7 +201,11 @@ case class NoMatchedTableSourceConverterException(
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead
+  *            (see [[org.apache.flink.table.sources.TableSourceFactory]]).
   */
+@Deprecated
+@deprecated("Use table factories (see TableSourceFactory) instead.")
 case class AmbiguousTableSourceConverterException(
     tableType: String,
     cause: Throwable)

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
index b79d161..f8f35eb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index d87d665..776ddee 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -22,7 +22,7 @@ import java.util.{Collection => JCollection, Collections => 
JCollections, Linked
 
 import org.apache.calcite.linq4j.tree.Expression
 import org.apache.calcite.schema._
-import org.apache.flink.table.api.{CatalogNotExistException, 
TableNotExistException}
+import org.apache.flink.table.api.{CatalogNotExistException, TableEnvironment, 
TableNotExistException}
 import org.apache.flink.table.util.Logging
 
 import scala.collection.JavaConverters._
@@ -33,10 +33,12 @@ import scala.collection.JavaConverters._
   * The external catalog and all included sub-catalogs and tables is 
registered as
   * sub-schemas and tables in Calcite.
   *
+  * @param tableEnv the environment for this schema
   * @param catalogIdentifier external catalog name
   * @param catalog           external catalog
   */
 class ExternalCatalogSchema(
+    tableEnv: TableEnvironment,
     catalogIdentifier: String,
     catalog: ExternalCatalog) extends Schema with Logging {
 
@@ -50,7 +52,7 @@ class ExternalCatalogSchema(
   override def getSubSchema(name: String): Schema = {
     try {
       val db = catalog.getSubCatalog(name)
-      new ExternalCatalogSchema(name, db)
+      new ExternalCatalogSchema(tableEnv, name, db)
     } catch {
       case _: CatalogNotExistException =>
         LOG.warn(s"Sub-catalog $name does not exist in externalCatalog 
$catalogIdentifier")
@@ -75,7 +77,7 @@ class ExternalCatalogSchema(
     */
   override def getTable(name: String): Table = try {
     val externalCatalogTable = catalog.getTable(name)
-    ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
+    ExternalTableSourceUtil.fromExternalCatalogTable(tableEnv, 
externalCatalogTable)
   } catch {
     case TableNotExistException(table, _, _) => {
       LOG.warn(s"Table $table does not exist in externalCatalog 
$catalogIdentifier")
@@ -111,15 +113,17 @@ object ExternalCatalogSchema {
   /**
     * Registers an external catalog in a Calcite schema.
     *
+    * @param tableEnv                  The environment the catalog will be 
part of.
     * @param parentSchema              Parent schema into which the catalog is 
registered
     * @param externalCatalogIdentifier Identifier of the external catalog
     * @param externalCatalog           The external catalog to register
     */
   def registerCatalog(
+      tableEnv: TableEnvironment,
       parentSchema: SchemaPlus,
       externalCatalogIdentifier: String,
       externalCatalog: ExternalCatalog): Unit = {
-    val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, 
externalCatalog)
+    val newSchema = new ExternalCatalogSchema(tableEnv, 
externalCatalogIdentifier, externalCatalog)
     val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, 
newSchema)
     newSchema.registerSubSchemas(schemaPlusOfNewSchema)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index ae20718..a7c20ac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -18,28 +18,296 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{HashMap => JHashMap, Map => JMap}
 import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
-import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.{TableException, TableSchema}
+import org.apache.flink.table.catalog.ExternalCatalogTable._
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, 
METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
   * Defines a table in an [[ExternalCatalog]].
   *
-  * @param tableType            Table type, e.g csv, hbase, kafka
-  * @param schema               Schema of the table (column names and types)
-  * @param properties           Properties of the table
-  * @param stats                Statistics of the table
-  * @param comment              Comment of the table
-  * @param createTime           Create timestamp of the table
-  * @param lastAccessTime       Timestamp of last access of the table
+  * For backwards compatibility this class supports both the legacy table type 
(see
+  * [[org.apache.flink.table.annotation.TableType]]) and the factory-based (see
+  * [[org.apache.flink.table.sources.TableSourceFactory]]) approach.
+  *
+  * @param connectorDesc describes the system to connect to
+  * @param formatDesc describes the data format of a connector
+  * @param schemaDesc describes the schema of the result table
+  * @param statisticsDesc describes the estimated statistics of the result 
table
+  * @param metadataDesc describes additional metadata of a table
   */
-case class ExternalCatalogTable(
+class ExternalCatalogTable(
+    connectorDesc: ConnectorDescriptor,
+    formatDesc: Option[FormatDescriptor],
+    schemaDesc: Option[Schema],
+    statisticsDesc: Option[Statistics],
+    metadataDesc: Option[Metadata])
+  extends TableSourceDescriptor(connectorDesc) {
+
+  this.formatDescriptor = formatDesc
+  this.schemaDescriptor = schemaDesc
+  this.statisticsDescriptor = statisticsDesc
+  this.metaDescriptor = metadataDesc
+
+  // expose statistics for external table source util
+  override def getTableStats: Option[TableStats] = super.getTableStats
+
+  // 
----------------------------------------------------------------------------------------------
+  // NOTE: the following code is used for backwards compatibility to the 
TableType approach
+  // 
----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns the legacy table type of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val tableType: String = {
+    val props = new DescriptorProperties()
+    connectorDesc.addProperties(props)
+    props
+      .getString(CONNECTOR_LEGACY_TYPE)
+      .getOrElse(throw new TableException("Could not find a legacy table type 
to return."))
+  }
+
+  /**
+    * Returns the legacy schema of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val schema: TableSchema = {
+    val props = new DescriptorProperties()
+    connectorDesc.addProperties(props)
+    props
+      .getTableSchema(CONNECTOR_LEGACY_SCHEMA)
+      .getOrElse(throw new TableException("Could not find a legacy schema to 
return."))
+  }
+
+  /**
+    * Returns the legacy properties of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val properties: JMap[String, String] = {
+    // skip normalization
+    val props = new DescriptorProperties(normalizeKeys = false)
+    val legacyProps = new JHashMap[String, String]()
+    connectorDesc.addProperties(props)
+    props.asMap.flatMap { case (k, v) =>
+      if (k.startsWith(CONNECTOR_LEGACY_PROPERTY)) {
+        // remove "connector.legacy-property-"
+        Some(legacyProps.put(k.substring(CONNECTOR_LEGACY_PROPERTY.length + 
1), v))
+      } else {
+        None
+      }
+    }
+    legacyProps
+  }
+
+  /**
+    * Returns the legacy statistics of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val stats: TableStats = getTableStats.orNull
+
+  /**
+    * Returns the legacy comment of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val comment: String = {
+    val normalizedProps = new DescriptorProperties()
+
+    metadataDesc match {
+      case Some(meta) =>
+        meta.addProperties(normalizedProps)
+        normalizedProps.getString(METADATA_COMMENT).orNull
+      case None =>
+        null
+    }
+  }
+
+  /**
+    * Returns the legacy creation time of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val createTime: JLong = {
+    val normalizedProps = new DescriptorProperties()
+
+    metadataDesc match {
+      case Some(meta) =>
+        meta.addProperties(normalizedProps)
+        normalizedProps.getLong(METADATA_CREATION_TIME).map(v => 
Long.box(v)).orNull
+      case None =>
+        null
+    }
+  }
+
+  /**
+    * Returns the legacy last access time of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val lastAccessTime: JLong = {
+    val normalizedProps = new DescriptorProperties()
+
+    metadataDesc match {
+      case Some(meta) =>
+        meta.addProperties(normalizedProps)
+        normalizedProps.getLong(METADATA_LAST_ACCESS_TIME).map(v => 
Long.box(v)).orNull
+      case None =>
+        null
+    }
+  }
+
+  /**
+    * Defines a table in an [[ExternalCatalog]].
+    *
+    * @param tableType            Table type, e.g csv, hbase, kafka
+    * @param schema               Schema of the table (column names and types)
+    * @param properties           Properties of the table
+    * @param stats                Statistics of the table
+    * @param comment              Comment of the table
+    * @param createTime           Create timestamp of the table
+    * @param lastAccessTime       Timestamp of last access of the table
+    * @deprecated Use a descriptor-based constructor instead.
+    */
+  @Deprecated
+  @deprecated("Use a descriptor-based constructor instead.")
+  def this(
+    tableType: String,
+    schema: TableSchema,
+    properties: JMap[String, String] = new JHashMap(),
+    stats: TableStats = null,
+    comment: String = null,
+    createTime: JLong = System.currentTimeMillis,
+    lastAccessTime: JLong = -1L) = {
+
+    this(
+      toConnectorDescriptor(tableType, schema, properties),
+      None,
+      None,
+      Some(toStatisticsDescriptor(stats)),
+      Some(toMetadataDescriptor(comment, createTime, lastAccessTime)))
+  }
+
+  /**
+    * Returns whether this external catalog table uses the legacy table type.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  def isLegacyTableType: Boolean = 
connectorDesc.isInstanceOf[TableTypeConnector]
+}
+
+object ExternalCatalogTable {
+
+  val CONNECTOR_TYPE_VALUE = "legacy-table-type"
+  val CONNECTOR_LEGACY_TYPE = "connector.legacy-type"
+  val CONNECTOR_LEGACY_SCHEMA = "connector.legacy-schema"
+  val CONNECTOR_LEGACY_PROPERTY = "connector.legacy-property"
+
+  /**
+    * Defines a table in an [[ExternalCatalog]].
+    *
+    * @param tableType            Table type, e.g csv, hbase, kafka
+    * @param schema               Schema of the table (column names and types)
+    * @param properties           Properties of the table
+    * @param stats                Statistics of the table
+    * @param comment              Comment of the table
+    * @param createTime           Create timestamp of the table
+    * @param lastAccessTime       Timestamp of last access of the table
+    * @deprecated Use a descriptor-based constructor instead.
+    */
+  @Deprecated
+  @deprecated("Use a descriptor-based constructor instead.")
+  def apply(
     tableType: String,
     schema: TableSchema,
     properties: JMap[String, String] = new JHashMap(),
     stats: TableStats = null,
     comment: String = null,
     createTime: JLong = System.currentTimeMillis,
-    lastAccessTime: JLong = -1L)
+    lastAccessTime: JLong = -1L): ExternalCatalogTable = {
+
+    new ExternalCatalogTable(
+      tableType,
+      schema,
+      properties,
+      stats,
+      comment,
+      createTime,
+      lastAccessTime)
+  }
+
+  class TableTypeConnector(
+      tableType: String,
+      schema: TableSchema,
+      legacyProperties: JMap[String, String])
+    extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+
+    override protected def addConnectorProperties(properties: 
DescriptorProperties): Unit = {
+      properties.putString(CONNECTOR_LEGACY_TYPE, tableType)
+      properties.putTableSchema(CONNECTOR_LEGACY_SCHEMA, schema)
+      legacyProperties.asScala.foreach { case (k, v) =>
+          properties.putString(s"$CONNECTOR_LEGACY_PROPERTY-$k", v)
+      }
+    }
+
+    override private[flink] def needsFormat() = false
+  }
+
+  def toConnectorDescriptor(
+      tableType: String,
+      schema: TableSchema,
+      properties: JMap[String, String])
+    : ConnectorDescriptor = {
+
+    new TableTypeConnector(tableType, schema, properties)
+  }
+
+  def toStatisticsDescriptor(stats: TableStats): Statistics = {
+    val statsDesc = Statistics()
+    if (stats != null) {
+      statsDesc.tableStats(stats)
+    }
+    statsDesc
+  }
+
+  def toMetadataDescriptor(
+      comment: String,
+      createTime: JLong,
+      lastAccessTime: JLong)
+    : Metadata = {
+
+    val metadataDesc = Metadata()
+    if (comment != null) {
+      metadataDesc.comment(comment)
+    }
+    metadataDesc
+      .creationTime(createTime)
+      .lastAccessTime(lastAccessTime)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index d2e297f..3bc5dc0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, 
TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, 
TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] 
instance
+    *
+    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which 
to convert
+    * @return converted [[TableSourceTable]] instance from the input catalog 
table
+    */
+  def fromExternalCatalogTable(
+      tableEnv: TableEnvironment,
+      externalCatalogTable: ExternalCatalogTable)
+    : TableSourceTable[_] = {
+
+    // check for the legacy external catalog path
+    if (externalCatalogTable.isLegacyTableType) {
+      LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+        "Please consider updating them to TableSourceFactories.")
+      fromExternalCatalogTableType(externalCatalogTable)
+    }
+    // use the factory approach
+    else {
+      val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+      tableEnv match {
+        // check for a batch table source in this batch environment
+        case _: BatchTableEnvironment =>
+          source match {
+            case bts: BatchTableSource[_] =>
+              new BatchTableSourceTable(
+                bts,
+                new FlinkStatistic(externalCatalogTable.getTableStats))
+            case _ => throw new TableException(
+              s"Found table source '${source.getClass.getCanonicalName}' is 
not applicable " +
+                s"in a batch environment.")
+          }
+        // check for a stream table source in this streaming environment
+        case _: StreamTableEnvironment =>
+          source match {
+            case sts: StreamTableSource[_] =>
+              new StreamTableSourceTable(
+                sts,
+                new FlinkStatistic(externalCatalogTable.getTableStats))
+            case _ => throw new TableException(
+              s"Found table source '${source.getClass.getCanonicalName}' is 
not applicable " +
+                s"in a streaming environment.")
+          }
+        case _ => throw new TableException("Unsupported table environment.")
+      }
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+  // NOTE: the following lines can be removed once we drop support for 
TableType
+  // 
----------------------------------------------------------------------------------------------
+
   // config file to specify scan package to search TableSourceConverter
   private val tableSourceConverterConfigFileName = 
"tableSourceConverter.properties"
 
@@ -48,7 +100,7 @@ object ExternalTableSourceUtil extends Logging {
     val registeredConverters =
       new mutable.HashMap[String, mutable.Set[Class[_ <: 
TableSourceConverter[_]]]]
           with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]]
-    // scan all config files to find TableSourceConverters which are 
annotationed with TableType.
+    // scan all config files to find TableSourceConverters which are annotated 
with TableType.
     val resourceUrls = 
getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
     while (resourceUrls.hasMoreElements) {
       val url = resourceUrls.nextElement()
@@ -89,12 +141,31 @@ object ExternalTableSourceUtil extends Logging {
   }
 
   /**
-    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] 
instance
+    * Parses scan package set from input config file
     *
-    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which 
to convert
-    * @return converted [[TableSourceTable]] instance from the input catalog 
table
+    * @param url url of config file
+    * @return scan package set
     */
-  def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): 
TableSourceTable[_] = {
+  private def parseScanPackagesFromConfigFile(url: URL): Set[String] = {
+    try {
+      val config = new PropertiesConfiguration(url)
+      config.setListDelimiter(',')
+      config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet
+    } catch {
+      case e: ConfigurationException =>
+        LOG.warn(s"Error happened while loading the properties file [$url]", e)
+        Set.empty
+      case e1: ConversionException =>
+        LOG.warn(s"Error happened while parsing 'scan.packages' field of 
properties file [$url]. " +
+            s"The value is not a String or List of Strings.", e1)
+        Set.empty
+    }
+  }
+
+  @VisibleForTesting
+  def fromExternalCatalogTableType(externalCatalogTable: ExternalCatalogTable)
+    : TableSourceTable[_] = {
+
     val tableType = externalCatalogTable.tableType
     val propertyKeys = externalCatalogTable.properties.keySet()
     tableTypeToTableSourceConvertersClazz.get(tableType) match {
@@ -141,27 +212,4 @@ object ExternalTableSourceUtil extends Logging {
         throw new NoMatchedTableSourceConverterException(tableType)
     }
   }
-
-  /**
-    * Parses scan package set from input config file
-    *
-    * @param url url of config file
-    * @return scan package set
-    */
-  private def parseScanPackagesFromConfigFile(url: URL): Set[String] = {
-    try {
-      val config = new PropertiesConfiguration(url)
-      config.setListDelimiter(',')
-      config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet
-    } catch {
-      case e: ConfigurationException =>
-        LOG.warn(s"Error happened while loading the properties file [$url]", e)
-        Set.empty
-      case e1: ConversionException =>
-        LOG.warn(s"Error happened while parsing 'scan.packages' field of 
properties file [$url]. " +
-            s"The value is not a String or List of Strings.", e1)
-        Set.empty
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
index ca6df9a..e6248b0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
@@ -29,7 +29,11 @@ import org.apache.flink.table.sources.TableSource
   * table is supported.
   *
   * @tparam T The [[TableSource]] to be created by this converter.
+  *
+  * @deprecated Use the more generic 
[[org.apache.flink.table.sources.TableSourceFactory]] instead.
   */
+@Deprecated
+@deprecated("Use the more generic table source factories instead.")
 trait TableSourceConverter[T <: TableSource[_]] {
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
new file mode 100644
index 0000000..3c8366d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource, 
TableSourceFactoryService}
+
+class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: 
ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and 
returns it.
+    */
+  def toTableSource: TableSource[_] = {
+    val source = TableSourceFactoryService.findTableSourceFactory(this)
+    source match {
+      case _: BatchTableSource[_] => source
+      case _ => throw new TableException(
+        s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+          s"in a batch environment.")
+    }
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and 
returns it as a table.
+    */
+  def toTable: Table = {
+    tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and 
registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def register(name: String): Unit = {
+    tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = {
+    formatDescriptor = Some(format)
+    this
+  }
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  def withSchema(schema: Schema): BatchTableSourceDescriptor = {
+    schemaDescriptor = Some(schema)
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
new file mode 100644
index 0000000..f691b4f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(
+    private val tpe: String,
+    private val version: Int)
+  extends Descriptor {
+
+  override def toString: String = this.getClass.getSimpleName
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    properties.putString(CONNECTOR_TYPE, tpe)
+    properties.putLong(CONNECTOR_VERSION, version)
+    addConnectorProperties(properties)
+  }
+
+  /**
+    * Internal method for connector properties conversion.
+    */
+  protected def addConnectorProperties(properties: DescriptorProperties): Unit
+
+  /**
+    * Internal method that defines if this connector requires a format 
descriptor.
+    */
+  private[flink] def needsFormat(): Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
new file mode 100644
index 0000000..8ab0f45
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+
+/**
+  * Validator for [[ConnectorDescriptor]].
+  */
+class ConnectorDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1)
+    properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+  }
+}
+
+object ConnectorDescriptorValidator {
+
+  val CONNECTOR_TYPE = "connector.type"
+  val CONNECTOR_VERSION = "connector.version"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
new file mode 100644
index 0000000..0493d99
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.CsvValidator._
+
+import scala.collection.mutable
+
+/**
+  * Format descriptor for comma-separated values (CSV).
+  */
+class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
+
+  private var fieldDelim: Option[String] = None
+  private var lineDelim: Option[String] = None
+  private val formatSchema: mutable.LinkedHashMap[String, String] =
+      mutable.LinkedHashMap[String, String]()
+  private var quoteCharacter: Option[Character] = None
+  private var commentPrefix: Option[String] = None
+  private var isIgnoreFirstLine: Option[Boolean] = None
+  private var lenient: Option[Boolean] = None
+
+  /**
+    * Sets the field delimiter, "," by default.
+    *
+    * @param delim the field delimiter
+    */
+  def fieldDelimiter(delim: String): Csv = {
+    this.fieldDelim = Some(delim)
+    this
+  }
+
+  /**
+    * Sets the line delimiter, "\n" by default.
+    *
+    * @param delim the line delimiter
+    */
+  def lineDelimiter(delim: String): Csv = {
+    this.lineDelim = Some(delim)
+    this
+  }
+
+  /**
+    * Sets the format schema with field names and the types. Required.
+    * The table schema must not contain nested fields.
+    *
+    * This method overwrites existing fields added with [[field()]].
+    *
+    * @param schema the table schema
+    */
+  def schema(schema: TableSchema): Csv = {
+    this.formatSchema.clear()
+    DescriptorProperties.normalizeTableSchema(schema).foreach {
+      case (n, t) => field(n, t)
+    }
+    this
+  }
+
+  /**
+    * Adds a format field with the field name and the type information. 
Required.
+    * This method can be called multiple times. The call order of this method 
defines
+    * also the order of the fields in the format.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type information of the field
+    */
+  def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
+    field(fieldName, DescriptorProperties.normalizeTypeInfo(fieldType))
+    this
+  }
+
+  /**
+    * Adds a format field with the field name and the type string. Required.
+    * This method can be called multiple times. The call order of this method 
defines
+    * also the order of the fields in the format.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type string of the field
+    */
+  def field(fieldName: String, fieldType: String): Csv = {
+    if (formatSchema.contains(fieldName)) {
+      throw new ValidationException(s"Duplicate field name $fieldName.")
+    }
+    formatSchema += (fieldName -> fieldType)
+    this
+  }
+
+  /**
+    * Sets a quote character for String values, null by default.
+    *
+    * @param quote the quote character
+    */
+  def quoteCharacter(quote: Character): Csv = {
+    this.quoteCharacter = Option(quote)
+    this
+  }
+
+  /**
+    * Sets a prefix to indicate comments, null by default.
+    *
+    * @param prefix the prefix to indicate comments
+    */
+  def commentPrefix(prefix: String): Csv = {
+    this.commentPrefix = Option(prefix)
+    this
+  }
+
+  /**
+    * Ignore the first line. Not skip the first line by default.
+    */
+  def ignoreFirstLine(): Csv = {
+    this.isIgnoreFirstLine = Some(true)
+    this
+  }
+
+  /**
+    * Skip records with parse error instead to fail. Throw an exception by 
default.
+    */
+  def ignoreParseErrors(): Csv = {
+    this.lenient = Some(true)
+    this
+  }
+
+  /**
+    * Internal method for format properties conversion.
+    */
+  override protected def addFormatProperties(properties: 
DescriptorProperties): Unit = {
+    fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
+    lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
+    properties.putTableSchema(FORMAT_FIELDS, formatSchema.toIndexedSeq)
+    quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
+    commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
+    isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, 
_))
+    lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
+  }
+}
+
+/**
+  * Format descriptor for comma-separated values (CSV).
+  */
+object Csv {
+
+  /**
+    * Format descriptor for comma-separated values (CSV).
+    */
+  def apply(): Csv = new Csv()
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
new file mode 100644
index 0000000..d49314e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
+
+/**
+  * Validator for [[Csv]].
+  */
+class CsvValidator extends FormatDescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    super.validate(properties)
+    properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, isOptional = 
false)
+    properties.validateString(FORMAT_FIELD_DELIMITER, isOptional = true, 
minLen = 1)
+    properties.validateString(FORMAT_LINE_DELIMITER, isOptional = true, minLen 
= 1)
+    properties.validateString(FORMAT_QUOTE_CHARACTER, isOptional = true, 
minLen = 1, maxLen = 1)
+    properties.validateString(FORMAT_COMMENT_PREFIX, isOptional = true, minLen 
= 1)
+    properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, isOptional = true)
+    properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, isOptional = true)
+    properties.validateTableSchema(FORMAT_FIELDS, isOptional = false)
+  }
+}
+
+object CsvValidator {
+
+  val FORMAT_TYPE_VALUE = "csv"
+  val FORMAT_FIELD_DELIMITER = "format.field-delimiter"
+  val FORMAT_LINE_DELIMITER = "format.line-delimiter"
+  val FORMAT_QUOTE_CHARACTER = "format.quote-character"
+  val FORMAT_COMMENT_PREFIX = "format.comment-prefix"
+  val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
+  val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
+  val FORMAT_FIELDS = "format.fields"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
new file mode 100644
index 0000000..ad97ded
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
+
+  /**
+    * Internal method for properties conversion.
+    */
+  private[flink] def addProperties(properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
new file mode 100644
index 0000000..43d63b3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.io.Serializable
+import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, 
Long => JLong}
+import java.util
+import java.util.regex.Pattern
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, 
normalizeTableSchema}
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.Preconditions.checkNotNull
+
+import scala.collection.mutable
+
+import scala.collection.JavaConverters._
+
+/**
+  * Utility class for having a unified string-based representation of Table 
API related classes
+  * such as [[TableSchema]], [[TypeInformation]], etc.
+  *
+  * @param normalizeKeys flag that indicates if keys should be normalized 
(this flag is
+  *                      necessary for backwards compatibility)
+  */
+class DescriptorProperties(normalizeKeys: Boolean = true) {
+
+  private val properties: mutable.Map[String, String] = new 
mutable.HashMap[String, String]()
+
+  private def put(key: String, value: String): Unit = {
+    if (properties.contains(key)) {
+      throw new IllegalStateException("Property already present.")
+    }
+    if (normalizeKeys) {
+      properties.put(key.toLowerCase, value)
+    } else {
+      properties.put(key, value)
+    }
+  }
+
+  // for testing
+  private[flink] def unsafePut(key: String, value: String): Unit = {
+    properties.put(key, value)
+  }
+
+  // for testing
+  private[flink] def unsafeRemove(key: String): Unit = {
+    properties.remove(key)
+  }
+
+  def putProperties(properties: Map[String, String]): Unit = {
+    properties.foreach { case (k, v) =>
+      put(k, v)
+    }
+  }
+
+  def putProperties(properties: java.util.Map[String, String]): Unit = {
+    properties.asScala.foreach { case (k, v) =>
+      put(k, v)
+    }
+  }
+
+  def putClass(key: String, clazz: Class[_]): Unit = {
+    checkNotNull(key)
+    checkNotNull(clazz)
+    val error = InstantiationUtil.checkForInstantiationError(clazz)
+    if (error != null) {
+      throw new ValidationException(s"Class '${clazz.getName}' is not 
supported: $error")
+    }
+    put(key, clazz.getName)
+  }
+
+  def putString(key: String, str: String): Unit = {
+    checkNotNull(key)
+    checkNotNull(str)
+    put(key, str)
+  }
+
+  def putBoolean(key: String, b: Boolean): Unit = {
+    checkNotNull(key)
+    put(key, b.toString)
+  }
+
+  def putLong(key: String, l: Long): Unit = {
+    checkNotNull(key)
+    put(key, l.toString)
+  }
+
+  def putInt(key: String, i: Int): Unit = {
+    checkNotNull(key)
+    put(key, i.toString)
+  }
+
+  def putCharacter(key: String, c: Character): Unit = {
+    checkNotNull(key)
+    checkNotNull(c)
+    put(key, c.toString)
+  }
+
+  def putTableSchema(key: String, schema: TableSchema): Unit = {
+    putTableSchema(key, normalizeTableSchema(schema))
+  }
+
+  def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = {
+    putIndexedFixedProperties(
+      key,
+      Seq(NAME, TYPE),
+      nameAndType.map(t => Seq(t._1, t._2))
+    )
+  }
+
+  /**
+    * Adds an indexed sequence of properties (with sub-properties) under a 
common key.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG, schema.fields.1.name = test2
+    *
+    * The arity of each propertyValue must match the arity of propertyKeys.
+    */
+  def putIndexedFixedProperties(
+      key: String,
+      propertyKeys: Seq[String],
+      propertyValues: Seq[Seq[String]])
+    : Unit = {
+    checkNotNull(key)
+    checkNotNull(propertyValues)
+    propertyValues.zipWithIndex.foreach { case (values, idx) =>
+      if (values.lengthCompare(propertyKeys.size) != 0) {
+        throw new ValidationException("Values must have same arity as keys.")
+      }
+      values.zipWithIndex.foreach { case (value, keyIdx) =>
+          put(s"$key.$idx.${propertyKeys(keyIdx)}", value)
+      }
+    }
+  }
+
+  /**
+    * Adds an indexed mapping of properties under a common key.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    *                             schema.fields.1.name = test2
+    *
+    * The arity of the propertySets can differ.
+    */
+  def putIndexedVariableProperties(
+      key: String,
+      propertySets: Seq[Map[String, String]])
+    : Unit = {
+    checkNotNull(key)
+    checkNotNull(propertySets)
+    propertySets.zipWithIndex.foreach { case (propertySet, idx) =>
+      propertySet.foreach { case (k, v) =>
+        put(s"$key.$idx.$k", v)
+      }
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def getString(key: String): Option[String] = {
+    properties.get(key)
+  }
+
+  def getCharacter(key: String): Option[Character] = getString(key) match {
+    case Some(c) =>
+      if (c.length != 1) {
+        throw new ValidationException(s"The value of $key must only contain 
one character.")
+      }
+      Some(c.charAt(0))
+
+    case None => None
+  }
+
+  def getBoolean(key: String): Option[Boolean] = getString(key) match {
+    case Some(b) => Some(JBoolean.parseBoolean(b))
+
+    case None => None
+  }
+
+  def getInt(key: String): Option[Int] = getString(key) match {
+    case Some(l) => Some(JInt.parseInt(l))
+
+    case None => None
+  }
+
+  def getLong(key: String): Option[Long] = getString(key) match {
+    case Some(l) => Some(JLong.parseLong(l))
+
+    case None => None
+  }
+
+  def getDouble(key: String): Option[Double] = getString(key) match {
+    case Some(d) => Some(JDouble.parseDouble(d))
+
+    case None => None
+  }
+
+  def getTableSchema(key: String): Option[TableSchema] = {
+    // filter for number of columns
+    val fieldCount = properties
+      .filterKeys(k => k.startsWith(key) && k.endsWith(s".$NAME"))
+      .size
+
+    if (fieldCount == 0) {
+      return None
+    }
+
+    // validate fields and build schema
+    val schemaBuilder = TableSchema.builder()
+    for (i <- 0 until fieldCount) {
+      val name = s"$key.$i.$NAME"
+      val tpe = s"$key.$i.$TYPE"
+      schemaBuilder.field(
+        properties.getOrElse(name, throw new ValidationException(s"Invalid 
table schema. " +
+          s"Could not find name for field '$key.$i'.")
+        ),
+        TypeStringUtils.readTypeInfo(
+          properties.getOrElse(tpe, throw new ValidationException(s"Invalid 
table schema. " +
+          s"Could not find type for field '$key.$i'."))
+        )
+      )
+    }
+    Some(schemaBuilder.build())
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def validateString(
+      key: String,
+      isOptional: Boolean,
+      minLen: Int = 0, // inclusive
+      maxLen: Int = Integer.MAX_VALUE) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      val len = properties(key).length
+      if (len < minLen || len > maxLen) {
+        throw new ValidationException(
+          s"Property '$key' must have a length between $minLen and $maxLen but 
" +
+            s"was: ${properties(key)}")
+      }
+    }
+  }
+
+  def validateInt(
+      key: String,
+      isOptional: Boolean,
+      min: Int = Int.MinValue, // inclusive
+      max: Int = Int.MaxValue) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      try {
+        val value = Integer.parseInt(properties(key))
+        if (value < min || value > max) {
+          throw new ValidationException(s"Property '$key' must be an integer 
value between $min " +
+            s"and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be an integer value but was: 
${properties(key)}")
+      }
+    }
+  }
+
+  def validateLong(
+      key: String,
+      isOptional: Boolean,
+      min: Long = Long.MinValue, // inclusive
+      max: Long = Long.MaxValue) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      try {
+        val value = JLong.parseLong(properties(key))
+        if (value < min || value > max) {
+          throw new ValidationException(s"Property '$key' must be a long value 
between $min " +
+            s"and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be a long value but was: 
${properties(key)}")
+      }
+    }
+  }
+
+  def validateValue(key: String, value: String, isOptional: Boolean): Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      if (properties(key) != value) {
+        throw new ValidationException(
+          s"Could not find required value '$value' for property '$key'.")
+      }
+    }
+  }
+
+  def validateBoolean(key: String, isOptional: Boolean): Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      val value = properties(key)
+      if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) 
{
+        throw new ValidationException(
+          s"Property '$key' must be a boolean value (true/false) but was: 
$value")
+      }
+    }
+  }
+
+  def validateDouble(
+      key: String,
+      isOptional: Boolean,
+      min: Double = Double.MinValue, // inclusive
+      max: Double = Double.MaxValue) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      try {
+        val value = JDouble.parseDouble(properties(key))
+        if (value < min || value > max) {
+          throw new ValidationException(s"Property '$key' must be a double 
value between $min " +
+            s"and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be an double value but was: 
${properties(key)}")
+      }
+    }
+  }
+
+  def validateTableSchema(key: String, isOptional: Boolean): Unit = {
+    // filter for name columns
+    val names = getIndexedProperty(key, NAME)
+    // filter for type columns
+    val types = getIndexedProperty(key, TYPE)
+    if (names.isEmpty && types.isEmpty && !isOptional) {
+      throw new ValidationException(
+        s"Could not find the required schema for property '$key'.")
+    }
+    for (i <- 0 until Math.max(names.size, types.size)) {
+      validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1)
+      validateType(s"$key.$i.$TYPE", isOptional = false)
+    }
+  }
+
+  def validateEnum(
+      key: String,
+      isOptional: Boolean,
+      enumToValidation: Map[String, () => Unit])
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      val value = properties(key)
+      if (!enumToValidation.contains(value)) {
+        throw new ValidationException(s"Unknown value for property '$key'. " +
+          s"Supported values [${enumToValidation.keys.mkString(", ")}] but 
was: $value")
+      } else {
+        enumToValidation(value).apply() // run validation logic
+      }
+    }
+  }
+
+  def validateType(key: String, isOptional: Boolean): Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      TypeStringUtils.readTypeInfo(properties(key)) // throws validation 
exceptions
+    }
+  }
+
+  def validatePrefixExclusion(prefix: String): Unit = {
+    val invalidField = properties.find(_._1.startsWith(prefix))
+    if (invalidField.isDefined) {
+      throw new ValidationException(
+        s"Property '${invalidField.get._1}' is not allowed in this context.")
+    }
+  }
+
+  def validateExclusion(key: String): Unit = {
+    if (properties.contains(key)) {
+      throw new ValidationException(s"Property '$key' is not allowed in this 
context.")
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def getIndexedProperty(key: String, property: String): Map[String, String] = 
{
+    val escapedKey = Pattern.quote(key)
+    properties.filterKeys(k => 
k.matches(s"$escapedKey\\.\\d+\\.$property")).toMap
+  }
+
+  def contains(str: String): Boolean = {
+    properties.exists(e => e._1.contains(str))
+  }
+
+  def hasPrefix(prefix: String): Boolean = {
+    properties.exists(e => e._1.startsWith(prefix))
+  }
+
+  def asMap: Map[String, String] = {
+    properties.toMap
+  }
+}
+
+object DescriptorProperties {
+
+  val TYPE = "type"
+  val NAME = "name"
+
+  // the string representation should be equal to SqlTypeName
+  def normalizeTypeInfo(typeInfo: TypeInformation[_]): String = {
+    checkNotNull(typeInfo)
+    TypeStringUtils.writeTypeInfo(typeInfo)
+  }
+
+  def normalizeTableSchema(schema: TableSchema): Seq[(String, String)] = {
+    schema.getColumnNames.zip(schema.getTypes).map { case (n, t) =>
+      (n, normalizeTypeInfo(t))
+    }
+  }
+
+  def serialize(obj: Serializable): String = {
+    // test public accessibility
+    val error = InstantiationUtil.checkForInstantiationError(obj.getClass)
+    if (error != null) {
+      throw new ValidationException(s"Class '${obj.getClass.getName}' is not 
supported: $error")
+    }
+    try {
+      val byteArray = InstantiationUtil.serializeObject(obj)
+      Base64.encodeBase64URLSafeString(byteArray)
+    } catch {
+      case e: Exception =>
+        throw new ValidationException(
+          s"Unable to serialize class '${obj.getClass.getCanonicalName}'.", e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
new file mode 100644
index 0000000..007a406
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Validator for a descriptor. We put the validation methods and utilities in 
separate classes
+  * to keep the descriptor interfaces clean.
+  */
+trait DescriptorValidator {
+
+  /**
+    * Performs basic validation such as completeness tests.
+    */
+  def validate(properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
new file mode 100644
index 0000000..b1d900f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, 
CONNECTOR_TYPE_VALUE}
+
+/**
+  * Connector descriptor for a file system.
+  */
+class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 
1) {
+
+  private var path: Option[String] = None
+
+  /**
+    * Sets the path to a file or directory in a file system.
+    *
+    * @param path the path a file or directory
+    */
+  def path(path: String): FileSystem = {
+    this.path = Some(path)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override protected def addConnectorProperties(properties: 
DescriptorProperties): Unit = {
+    path.foreach(properties.putString(CONNECTOR_PATH, _))
+  }
+
+  override private[flink] def needsFormat() = true
+}
+
+/**
+  * Connector descriptor for a file system.
+  */
+object FileSystem {
+
+  /**
+    * Connector descriptor for a file system.
+    */
+  def apply(): FileSystem = new FileSystem()
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
new file mode 100644
index 0000000..b065b5f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, 
CONNECTOR_TYPE_VALUE}
+
+/**
+  * Validator for [[FileSystem]].
+  */
+class FileSystemValidator extends ConnectorDescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    super.validate(properties)
+    properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, isOptional 
= false)
+    properties.validateString(CONNECTOR_PATH, isOptional = false, minLen = 1)
+  }
+}
+
+object FileSystemValidator {
+
+  val CONNECTOR_TYPE_VALUE = "filesystem"
+  val CONNECTOR_PATH = "connector.path"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
new file mode 100644
index 0000000..86f6229
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+
+/**
+  * Describes the format of data.
+  *
+  * @param tpe string identifier for the format
+  */
+abstract class FormatDescriptor(
+    private val tpe: String,
+    private val version: Int)
+  extends Descriptor {
+
+  override def toString: String = this.getClass.getSimpleName
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    properties.putString(FORMAT_TYPE, tpe)
+    properties.putInt(FORMAT_VERSION, version)
+    addFormatProperties(properties)
+  }
+
+  /**
+    * Internal method for format properties conversion.
+    */
+  protected def addFormatProperties(properties: DescriptorProperties): Unit
+
+}

Reply via email to