[FLINK-9852] [table] Expose descriptor-based sink creation and introduce update 
mode

This commit exposes the new unified sink creation through the table 
environments and the external catalog table. It introduce a new update-mode 
property in order to distinguish between append, retract, and upsert table 
sources and sinks. This commit refactors the top-level API classes a last time 
and adds more documentation. This commit completes the unified table 
sources/sinks story from an API point of view.

Brief change log:
- Introduction of TableEnvironment.connect() and corresponding API builder 
classes
- Introduction of property update-mode: and update of existing connectors
- External catalog support with proper source/sink discovery and API

This closes #6343.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bb07e4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bb07e4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bb07e4e

Branch: refs/heads/release-1.6
Commit: 7bb07e4e74ed7167cd98599af1a851afa1d77252
Parents: 4862101
Author: Timo Walther <[email protected]>
Authored: Fri Jul 20 08:47:53 2018 +0200
Committer: Timo Walther <[email protected]>
Committed: Fri Jul 20 09:33:48 2018 +0200

----------------------------------------------------------------------
 .../kafka/Kafka010AvroTableSource.java          |   6 +-
 .../kafka/Kafka010JsonTableSource.java          |   6 +-
 .../kafka/Kafka011AvroTableSource.java          |   6 +-
 .../kafka/Kafka011JsonTableSource.java          |   6 +-
 .../kafka/Kafka08AvroTableSource.java           |   6 +-
 .../kafka/Kafka08JsonTableSource.java           |   6 +-
 .../kafka/Kafka09AvroTableSource.java           |   6 +-
 .../kafka/Kafka09JsonTableSource.java           |   6 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   6 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   4 +-
 .../connectors/kafka/KafkaTableSource.java      |   4 +-
 .../kafka/KafkaTableSourceFactory.java          |   3 +
 .../KafkaJsonTableSourceFactoryTestBase.java    |  11 +-
 .../kafka/KafkaTableSourceFactoryTestBase.java  |  11 +-
 .../apache/flink/table/client/config/Sink.java  |   4 +-
 .../flink/table/client/config/Source.java       |   4 +-
 .../flink/table/client/config/SourceSink.java   |   2 +-
 .../client/gateway/local/EnvironmentTest.java   |  14 +-
 .../gateway/local/ExecutionContextTest.java     |   6 +-
 .../gateway/local/LocalExecutorITCase.java      |  12 +-
 .../gateway/utils/TestTableSinkFactory.java     |   3 +
 .../gateway/utils/TestTableSourceFactory.java   |   3 +
 .../resources/test-sql-client-defaults.yaml     |   3 +
 .../test/resources/test-sql-client-factory.yaml |   1 +
 ...rg.apache.flink.table.factories.TableFactory |   6 +-
 .../flink/table/api/BatchTableEnvironment.scala |  36 +-
 .../table/api/StreamTableEnvironment.scala      |  26 +-
 .../flink/table/api/TableEnvironment.scala      |  32 +-
 .../table/catalog/ExternalCatalogSchema.scala   |   2 +-
 .../table/catalog/ExternalCatalogTable.scala    | 335 +++++++++++++++++--
 .../table/catalog/ExternalTableSourceUtil.scala |  70 ----
 .../flink/table/catalog/ExternalTableUtil.scala | 102 ++++++
 .../descriptors/BatchTableDescriptor.scala      |  31 ++
 .../BatchTableSourceDescriptor.scala            |  87 -----
 .../descriptors/ConnectTableDescriptor.scala    | 108 ++++++
 .../flink/table/descriptors/Descriptor.scala    |   9 +-
 .../descriptors/DescriptorProperties.scala      |  10 +
 .../descriptors/RegistrableDescriptor.scala     |  49 +++
 .../table/descriptors/SchematicDescriptor.scala |  35 ++
 .../descriptors/StreamTableDescriptor.scala     | 101 ++++++
 .../StreamTableDescriptorValidator.scala        |  48 +++
 .../StreamTableSourceDescriptor.scala           |  90 -----
 .../descriptors/StreamableDescriptor.scala      |  67 ++++
 .../table/descriptors/TableDescriptor.scala     |  20 +-
 .../descriptors/TableDescriptorValidator.scala  |  29 --
 .../table/descriptors/TableSinkDescriptor.scala |  32 --
 .../descriptors/TableSourceDescriptor.scala     |  57 ----
 .../flink/table/factories/TableFactory.scala    |   6 +
 .../table/factories/TableFactoryService.scala   |   8 +-
 .../table/factories/TableFactoryUtil.scala      |  82 +++++
 .../table/factories/TableFormatFactory.scala    |   2 +
 .../table/sinks/CsvAppendTableSinkFactory.scala |  45 +++
 .../table/sinks/CsvBatchTableSinkFactory.scala  |  38 +++
 .../flink/table/sinks/CsvTableSinkFactory.scala | 112 -------
 .../table/sinks/CsvTableSinkFactoryBase.scala   |  96 ++++++
 .../sources/CsvAppendTableSourceFactory.scala   |  45 +++
 .../sources/CsvBatchTableSourceFactory.scala    |  38 +++
 .../table/sources/CsvTableSourceFactory.scala   | 139 --------
 .../sources/CsvTableSourceFactoryBase.scala     | 123 +++++++
 .../flink/table/api/ExternalCatalogTest.scala   |  20 +-
 .../catalog/ExternalCatalogSchemaTest.scala     |   2 +-
 .../catalog/InMemoryExternalCatalogTest.scala   |   8 +-
 .../table/descriptors/DescriptorTestBase.scala  |  37 +-
 .../table/descriptors/TableDescriptorTest.scala | 110 ++++++
 .../descriptors/TableSourceDescriptorTest.scala | 114 -------
 .../table/runtime/utils/CommonTestData.scala    |  32 +-
 .../table/utils/MockTableEnvironment.scala      |   4 +-
 67 files changed, 1669 insertions(+), 913 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index ebbadcf..d9d0a91 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -39,7 +39,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka010AvroTableSource extends KafkaAvroTableSource {
@@ -120,7 +120,7 @@ public class Kafka010AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Builder builder() {
@@ -133,7 +133,7 @@ public class Kafka010AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaAvroTableSource.Builder<Kafka010AvroTableSource, 
Kafka010AvroTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index a5e33a1..38d9034 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka010JsonTableSource extends KafkaJsonTableSource {
@@ -121,7 +121,7 @@ public class Kafka010JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Kafka010JsonTableSource.Builder builder() {
@@ -134,7 +134,7 @@ public class Kafka010JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaJsonTableSource.Builder<Kafka010JsonTableSource, 
Kafka010JsonTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index b3f4e0a..fab592f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -39,7 +39,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka011AvroTableSource extends KafkaAvroTableSource {
@@ -119,7 +119,7 @@ public class Kafka011AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Builder builder() {
@@ -132,7 +132,7 @@ public class Kafka011AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaAvroTableSource.Builder<Kafka011AvroTableSource, 
Kafka011AvroTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 74c5007..375eead 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka011JsonTableSource extends KafkaJsonTableSource {
@@ -121,7 +121,7 @@ public class Kafka011JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Kafka011JsonTableSource.Builder builder() {
@@ -134,7 +134,7 @@ public class Kafka011JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaJsonTableSource.Builder<Kafka011JsonTableSource, 
Kafka011JsonTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 8206287..61c96bf 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -39,7 +39,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka08AvroTableSource extends KafkaAvroTableSource {
@@ -120,7 +120,7 @@ public class Kafka08AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Builder builder() {
@@ -133,7 +133,7 @@ public class Kafka08AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaAvroTableSource.Builder<Kafka08AvroTableSource, 
Kafka08AvroTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index acb5783..dc5a077 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka08JsonTableSource extends KafkaJsonTableSource {
@@ -120,7 +120,7 @@ public class Kafka08JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Kafka08JsonTableSource.Builder builder() {
@@ -133,7 +133,7 @@ public class Kafka08JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaJsonTableSource.Builder<Kafka08JsonTableSource, 
Kafka08JsonTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index cd4bad9..4352d7e4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -39,7 +39,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka09AvroTableSource extends KafkaAvroTableSource {
@@ -118,7 +118,7 @@ public class Kafka09AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Builder builder() {
@@ -131,7 +131,7 @@ public class Kafka09AvroTableSource extends 
KafkaAvroTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaAvroTableSource.Builder<Kafka09AvroTableSource, 
Kafka09AvroTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index ad4e0d8..db1df3d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 public class Kafka09JsonTableSource extends KafkaJsonTableSource {
@@ -120,7 +120,7 @@ public class Kafka09JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static Kafka09JsonTableSource.Builder builder() {
@@ -133,7 +133,7 @@ public class Kafka09JsonTableSource extends 
KafkaJsonTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        public static class Builder extends 
KafkaJsonTableSource.Builder<Kafka09JsonTableSource, 
Kafka09JsonTableSource.Builder> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 3e9f2b0..86fd21d 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -40,7 +40,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 @Internal
@@ -56,7 +56,7 @@ public abstract class KafkaAvroTableSource extends 
KafkaTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        protected KafkaAvroTableSource(
@@ -89,7 +89,7 @@ public abstract class KafkaAvroTableSource extends 
KafkaTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        protected abstract static class Builder<T extends KafkaAvroTableSource, 
B extends KafkaAvroTableSource.Builder>

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index bd0d0de..70b286b 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -39,7 +39,7 @@ import java.util.Properties;
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
  *             implementation-agnostic definition of tables. See also
- *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
 @Deprecated
 @Internal
@@ -98,7 +98,7 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource {
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        protected abstract static class Builder<T extends KafkaJsonTableSource, 
B extends KafkaJsonTableSource.Builder>

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 78b373b..474c22f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -406,7 +406,7 @@ public abstract class KafkaTableSource implements
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
-        *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+        *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
        protected abstract static class Builder<T extends KafkaTableSource, B 
extends KafkaTableSource.Builder> {
@@ -676,7 +676,7 @@ public abstract class KafkaTableSource implements
                 * @deprecated Use the {@link 
org.apache.flink.table.descriptors.Kafka} descriptor together
                 *             with descriptors for schema and format instead. 
Descriptors allow for
                 *             implementation-agnostic definition of tables. 
See also
-                *             {@link 
org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
+                *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
                 */
                @Deprecated
                protected abstract B builder();

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
index 380d657..d7e42f5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
@@ -67,6 +67,8 @@ import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
 import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
 
 /**
  * Factory for creating configured instances of {@link KafkaTableSource}.
@@ -76,6 +78,7 @@ public abstract class KafkaTableSourceFactory implements 
StreamTableSourceFactor
        @Override
        public Map<String, String> requiredContext() {
                Map<String, String> context = new HashMap<>();
+               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // 
append mode
                context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // 
kafka
                context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
                context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards 
compatibility

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 51017f4..20da156 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.descriptors.Json;
 import org.apache.flink.table.descriptors.Kafka;
 import org.apache.flink.table.descriptors.Rowtime;
 import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.descriptors.TestTableDescriptor;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.sources.TableSource;
@@ -135,20 +135,21 @@ public abstract class KafkaJsonTableSourceFactoryTestBase 
{
                offsets.put(0, 100L);
                offsets.put(1, 123L);
 
-               final TestTableSourceDescriptor testDesc = new 
TestTableSourceDescriptor(
+               final TestTableDescriptor testDesc = new TestTableDescriptor(
                                new Kafka()
                                        .version(version())
                                        .topic(TOPIC)
                                        .properties(props)
                                        .startFromSpecificOffsets(offsets))
-                       .addFormat(format)
-                       .addSchema(
+                       .withFormat(format)
+                       .withSchema(
                                new Schema()
                                                .field("fruit-name", 
Types.STRING).from("name")
                                                .field("count", Types.BIG_DEC) 
// no from so it must match with the input
                                                .field("event-time", 
Types.SQL_TIMESTAMP).rowtime(
                                                        new 
Rowtime().timestampsFromField("time").watermarksPeriodicAscending())
-                                               .field("proc-time", 
Types.SQL_TIMESTAMP).proctime());
+                                               .field("proc-time", 
Types.SQL_TIMESTAMP).proctime())
+                       .inAppendMode();
 
                DescriptorProperties properties = new 
DescriptorProperties(true);
                testDesc.addProperties(properties);

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
index 1e8266d..96f1607 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.Kafka;
 import org.apache.flink.table.descriptors.Rowtime;
 import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.descriptors.TestTableDescriptor;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.factories.utils.TestDeserializationSchema;
@@ -129,20 +129,21 @@ public abstract class KafkaTableSourceFactoryTestBase 
extends TestLogger {
                offsets.put(PARTITION_0, OFFSET_0);
                offsets.put(PARTITION_1, OFFSET_1);
 
-               final TestTableSourceDescriptor testDesc = new 
TestTableSourceDescriptor(
+               final TestTableDescriptor testDesc = new TestTableDescriptor(
                                new Kafka()
                                        .version(getKafkaVersion())
                                        .topic(TOPIC)
                                        .properties(KAFKA_PROPERTIES)
                                        .startFromSpecificOffsets(offsets))
-                       .addFormat(new TestTableFormat())
-                       .addSchema(
+                       .withFormat(new TestTableFormat())
+                       .withSchema(
                                new Schema()
                                        .field(FRUIT_NAME, 
Types.STRING()).from(NAME)
                                        .field(COUNT, Types.DECIMAL()) // no 
from so it must match with the input
                                        .field(EVENT_TIME, 
Types.SQL_TIMESTAMP()).rowtime(
                                                new 
Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
-                                       .field(PROC_TIME, 
Types.SQL_TIMESTAMP()).proctime());
+                                       .field(PROC_TIME, 
Types.SQL_TIMESTAMP()).proctime())
+                       .inAppendMode();
                final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
                testDesc.addProperties(descriptorProperties);
                final Map<String, String> propertiesMap = 
descriptorProperties.asMap();

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
index 0de65fb..49ac14a 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
@@ -19,14 +19,14 @@
 package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.TableSinkDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
 
 import java.util.Map;
 
 /**
  * Configuration of a table sink.
  */
-public class Sink extends TableSinkDescriptor {
+public class Sink implements TableDescriptor {
 
        private String name;
        private Map<String, String> properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
index ef80596..1eb0ad9 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
@@ -19,14 +19,14 @@
 package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.TableSourceDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
 
 import java.util.Map;
 
 /**
  * Configuration of a table source.
  */
-public class Source extends TableSourceDescriptor {
+public class Source implements TableDescriptor {
 
        private String name;
        private Map<String, String> properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
index bfa3c44..b6b9bea 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
@@ -26,7 +26,7 @@ import java.util.Map;
 /**
  * Common class for all descriptors describing a table source and sink 
together.
  */
-public class SourceSink extends TableDescriptor {
+public class SourceSink implements TableDescriptor {
 
        private String name;
        private Map<String, String> properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index dcd94e2..6fbf29d 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -23,8 +23,9 @@ import 
org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 
 import org.junit.Test;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -40,10 +41,17 @@ public class EnvironmentTest {
 
        @Test
        public void testMerging() throws Exception {
-               final Environment env1 = 
EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE);
+               final Map<String, String> replaceVars1 = new HashMap<>();
+               replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append");
+               final Environment env1 = EnvironmentFileUtil.parseModified(
+                       DEFAULTS_ENVIRONMENT_FILE,
+                       replaceVars1);
+
+               final Map<String, String> replaceVars2 = new 
HashMap<>(replaceVars1);
+               replaceVars2.put("TableNumber1", "NewTable");
                final Environment env2 = EnvironmentFileUtil.parseModified(
                        FACTORY_ENVIRONMENT_FILE,
-                       Collections.singletonMap("TableNumber1", "NewTable"));
+                       replaceVars2);
 
                final Environment merged = Environment.merge(env1, env2);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index f9bcaf3..bc29f6f 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
@@ -112,9 +113,12 @@ public class ExecutionContextTest {
        }
 
        private <T> ExecutionContext<T> createExecutionContext() throws 
Exception {
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_2", "streaming");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
                final Environment env = EnvironmentFileUtil.parseModified(
                        DEFAULTS_ENVIRONMENT_FILE,
-                       Collections.singletonMap("$VAR_2", "streaming"));
+                       replaceVars);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
                final Configuration flinkConfig = new Configuration();
                return new ExecutionContext<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index b1bf304..d8452e4 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -136,13 +136,12 @@ public class LocalExecutorITCase extends TestLogger {
                executor.getSessionProperties(session);
 
                // modify defaults
-               session.setSessionProperty("execution.type", "streaming");
                session.setSessionProperty("execution.result-mode", "table");
 
                final Map<String, String> actualProperties = 
executor.getSessionProperties(session);
 
                final Map<String, String> expectedProperties = new HashMap<>();
-               expectedProperties.put("execution.type", "streaming");
+               expectedProperties.put("execution.type", "batch");
                expectedProperties.put("execution.time-characteristic", 
"event-time");
                
expectedProperties.put("execution.periodic-watermarks-interval", "99");
                expectedProperties.put("execution.parallelism", "1");
@@ -178,6 +177,7 @@ public class LocalExecutorITCase extends TestLogger {
                replaceVars.put("$VAR_1", "/");
                replaceVars.put("$VAR_2", "streaming");
                replaceVars.put("$VAR_3", "changelog");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
 
                final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
@@ -216,6 +216,7 @@ public class LocalExecutorITCase extends TestLogger {
                replaceVars.put("$VAR_1", "/");
                replaceVars.put("$VAR_2", "streaming");
                replaceVars.put("$VAR_3", "table");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
 
                final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
@@ -253,6 +254,7 @@ public class LocalExecutorITCase extends TestLogger {
                replaceVars.put("$VAR_1", "/");
                replaceVars.put("$VAR_2", "batch");
                replaceVars.put("$VAR_3", "table");
+               replaceVars.put("$VAR_UPDATE_MODE", "");
 
                final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
@@ -287,6 +289,7 @@ public class LocalExecutorITCase extends TestLogger {
                replaceVars.put("$VAR_0", url.getPath());
                replaceVars.put("$VAR_2", "streaming");
                replaceVars.put("$VAR_4", csvOutputPath);
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
 
                final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
@@ -333,8 +336,11 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> 
clusterClient) throws Exception {
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_2", "batch");
+               replaceVars.put("$VAR_UPDATE_MODE", "");
                return new LocalExecutor(
-                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, 
Collections.singletonMap("$VAR_2", "batch")),
+                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
                        Collections.emptyList(),
                        clusterClient.getFlinkConfiguration(),
                        new DummyCustomCommandLine<T>(clusterClient));

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java
index 4b50581..207d5d2 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java
@@ -45,6 +45,8 @@ import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERM
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
 
 /**
  * Table sink factory for testing the classloading in {@link DependencyTest}.
@@ -54,6 +56,7 @@ public class TestTableSinkFactory implements 
StreamTableSinkFactory<Row> {
        @Override
        public Map<String, String> requiredContext() {
                final Map<String, String> context = new HashMap<>();
+               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND());
                context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE);
                return context;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
index a08472c..81f00e5 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
@@ -48,6 +48,8 @@ import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERM
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
 
 /**
  * Table source factory for testing the classloading in {@link DependencyTest}.
@@ -57,6 +59,7 @@ public class TestTableSourceFactory implements 
StreamTableSourceFactory<Row> {
        @Override
        public Map<String, String> requiredContext() {
                final Map<String, String> context = new HashMap<>();
+               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND());
                context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE);
                return context;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 2a886f0..b759874 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -26,6 +26,7 @@
 tables:
   - name: TableNumber1
     type: source
+    $VAR_UPDATE_MODE
     schema:
       - name: IntegerField1
         type: INT
@@ -45,6 +46,7 @@ tables:
       comment-prefix: "#"
   - name: TableNumber2
     type: source
+    $VAR_UPDATE_MODE
     schema:
       - name: IntegerField2
         type: INT
@@ -64,6 +66,7 @@ tables:
       comment-prefix: "#"
   - name: TableSourceSink
     type: both
+    $VAR_UPDATE_MODE
     schema:
       - name: BooleanField
         type: BOOLEAN

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index 01ad63d..3ce513e 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -26,6 +26,7 @@
 tables:
   - name: TableNumber1
     type: both
+    update-mode: append
     schema:
       - name: IntegerField1
         type: INT

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 4cda0ad..ece39eb 100644
--- 
a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,5 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.sources.CsvTableSourceFactory
-org.apache.flink.table.sinks.CsvTableSinkFactory
+org.apache.flink.table.sources.CsvBatchTableSourceFactory
+org.apache.flink.table.sources.CsvAppendTableSourceFactory
+org.apache.flink.table.sinks.CsvBatchTableSinkFactory
+org.apache.flink.table.sinks.CsvAppendTableSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index a239ad5..9265f0f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, 
ConnectorDescriptor}
+import org.apache.flink.table.descriptors.{BatchTableDescriptor, 
ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -126,7 +126,7 @@ abstract class BatchTableEnvironment(
           }
 
           // no table is registered
-          case None =>
+          case _ =>
             val newTable = new TableSourceSinkTable(
               Some(new BatchTableSourceTable(batchTableSource)),
               None)
@@ -141,26 +141,21 @@ abstract class BatchTableEnvironment(
   }
 
   /**
-    * Creates a table from a descriptor that describes the source connector, 
the source format,
-    * the resulting table schema, and other properties.
+    * Creates a table source and/or table sink from a descriptor.
     *
-    * Descriptors allow for declaring communication to external systems in an
-    * implementation-agnostic way. The classpath is scanned for connectors and 
matching connectors
-    * are configured accordingly.
+    * Descriptors allow for declaring the communication to external systems in 
an
+    * implementation-agnostic way. The classpath is scanned for suitable table 
factories that match
+    * the desired configuration.
     *
-    * The following example shows how to read from a Kafka connector using a 
JSON format and
-    * creating a table:
+    * The following example shows how to read from a connector using a JSON 
format and
+    * registering a table source as "MyTable":
     *
     * {{{
     *
     * tableEnv
-    *   .from(
-    *     new Kafka()
-    *       .version("0.11")
-    *       .topic("clicks")
-    *       .property("zookeeper.connect", "localhost")
-    *       .property("group.id", "click-group")
-    *       .startFromEarliest())
+    *   .connect(
+    *     new ExternalSystemXYZ()
+    *       .version("0.11"))
     *   .withFormat(
     *     new Json()
     *       .jsonSchema("{...}")
@@ -169,14 +164,13 @@ abstract class BatchTableEnvironment(
     *     new Schema()
     *       .field("user-name", "VARCHAR").from("u_name")
     *       .field("count", "DECIMAL")
-    *       .field("proc-time", "TIMESTAMP").proctime())
-    *   .toTable()
+    *   .registerSource("MyTable")
     * }}}
     *
-    * @param connectorDescriptor connector descriptor describing the source of 
the table
+    * @param connectorDescriptor connector descriptor describing the external 
system
     */
-  def from(connectorDescriptor: ConnectorDescriptor): 
BatchTableSourceDescriptor = {
-    new BatchTableSourceDescriptor(this, connectorDescriptor)
+  def connect(connectorDescriptor: ConnectorDescriptor): BatchTableDescriptor 
= {
+    new BatchTableDescriptor(this, connectorDescriptor)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 33b984d..4c73032 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.calcite.{FlinkTypeFactory, 
RelTimeIndicatorConverter}
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
StreamTableSourceDescriptor}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
StreamTableDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -145,7 +145,7 @@ abstract class StreamTableEnvironment(
           }
 
           // no table is registered
-          case None =>
+          case _ =>
             val newTable = new TableSourceSinkTable(
               Some(new StreamTableSourceTable(streamTableSource)),
               None)
@@ -160,20 +160,19 @@ abstract class StreamTableEnvironment(
   }
 
   /**
-    * Creates a table from a descriptor that describes the source connector, 
the source format,
-    * the resulting table schema, and other properties.
+    * Creates a table source and/or table sink from a descriptor.
     *
-    * Descriptors allow for declaring communication to external systems in an
-    * implementation-agnostic way. The classpath is scanned for connectors and 
matching connectors
-    * are configured accordingly.
+    * Descriptors allow for declaring the communication to external systems in 
an
+    * implementation-agnostic way. The classpath is scanned for suitable table 
factories that match
+    * the desired configuration.
     *
     * The following example shows how to read from a Kafka connector using a 
JSON format and
-    * creating a table:
+    * registering a table source "MyTable" in append mode:
     *
     * {{{
     *
     * tableEnv
-    *   .from(
+    *   .connect(
     *     new Kafka()
     *       .version("0.11")
     *       .topic("clicks")
@@ -189,13 +188,14 @@ abstract class StreamTableEnvironment(
     *       .field("user-name", "VARCHAR").from("u_name")
     *       .field("count", "DECIMAL")
     *       .field("proc-time", "TIMESTAMP").proctime())
-    *   .toTable()
+    *   .inAppendMode()
+    *   .registerSource("MyTable")
     * }}}
     *
-    * @param connectorDescriptor connector descriptor describing the source of 
the table
+    * @param connectorDescriptor connector descriptor describing the external 
system
     */
-  def from(connectorDescriptor: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
-    new StreamTableSourceDescriptor(this, connectorDescriptor)
+  def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor 
= {
+    new StreamTableDescriptor(this, connectorDescriptor)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 6a299dd..37f6d02 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -49,7 +49,7 @@ import 
org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{ExpressionReducer, 
FunctionCodeGenerator, GeneratedFunction}
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
TableSourceDescriptor}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
TableDescriptor}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
@@ -523,26 +523,21 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
-    * Creates a table from a descriptor that describes the source connector, 
the source format,
-    * the resulting table schema, and other properties.
+    * Creates a table source and/or table sink from a descriptor.
     *
-    * Descriptors allow for declaring communication to external systems in an
-    * implementation-agnostic way. The classpath is scanned for connectors and 
matching connectors
-    * are configured accordingly.
+    * Descriptors allow for declaring the communication to external systems in 
an
+    * implementation-agnostic way. The classpath is scanned for suitable table 
factories that match
+    * the desired configuration.
     *
-    * The following example shows how to read from a Kafka connector using a 
JSON format and
-    * creating table:
+    * The following example shows how to read from a connector using a JSON 
format and
+    * registering a table source as "MyTable":
     *
     * {{{
     *
     * tableEnv
-    *   .from(
-    *     new Kafka()
-    *       .version("0.11")
-    *       .topic("clicks")
-    *       .property("zookeeper.connect", "localhost")
-    *       .property("group.id", "click-group")
-    *       .startFromEarliest())
+    *   .connect(
+    *     new ExternalSystemXYZ()
+    *       .version("0.11"))
     *   .withFormat(
     *     new Json()
     *       .jsonSchema("{...}")
@@ -551,13 +546,12 @@ abstract class TableEnvironment(val config: TableConfig) {
     *     new Schema()
     *       .field("user-name", "VARCHAR").from("u_name")
     *       .field("count", "DECIMAL")
-    *       .field("proc-time", "TIMESTAMP").proctime())
-    *   .toTable()
+    *   .registerSource("MyTable")
     * }}}
     *
-    * @param connectorDescriptor connector descriptor describing the source of 
the table
+    * @param connectorDescriptor connector descriptor describing the external 
system
     */
-  def from(connectorDescriptor: ConnectorDescriptor): TableSourceDescriptor
+  def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor
 
   private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
     require(tablePath != null && !tablePath.isEmpty, "tablePath must not be 
null or empty.")

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 776ddee..adac938 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -77,7 +77,7 @@ class ExternalCatalogSchema(
     */
   override def getTable(name: String): Table = try {
     val externalCatalogTable = catalog.getTable(name)
-    ExternalTableSourceUtil.fromExternalCatalogTable(tableEnv, 
externalCatalogTable)
+    ExternalTableUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable)
   } catch {
     case TableNotExistException(table, _, _) => {
       LOG.warn(s"Table $table does not exist in externalCatalog 
$catalogIdentifier")

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index c41a350..79da852 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -18,33 +18,326 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
+import org.apache.flink.table.factories.TableFactory
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
-  * Defines a table in an [[ExternalCatalog]].
+  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
+  * and/or sinks for both batch and stream environments.
+  *
+  * See also [[TableFactory]] for more information about how to target 
suitable factories.
+  *
+  * Use [[ExternalCatalogTableBuilder]] to integrate with the normalized 
descriptor-based API.
   *
-  * @param connectorDesc describes the system to connect to
-  * @param formatDesc describes the data format of a connector
-  * @param schemaDesc describes the schema of the result table
-  * @param statisticsDesc describes the estimated statistics of the result 
table
-  * @param metadataDesc describes additional metadata of a table
+  * @param isBatch Flag whether this external table is intended for batch 
environments.
+  * @param isStreaming Flag whether this external table is intended for 
streaming environments.
+  * @param isSource Flag whether this external table is declared as table 
source.
+  * @param isSink Flag whether this external table is declared as table sink.
+  * @param properties Properties that describe the table and should match with 
a [[TableFactory]].
   */
 class ExternalCatalogTable(
-    connectorDesc: ConnectorDescriptor,
-    formatDesc: Option[FormatDescriptor],
-    schemaDesc: Option[Schema],
-    statisticsDesc: Option[Statistics],
-    metadataDesc: Option[Metadata])
-  extends TableSourceDescriptor {
-
-  this.connectorDescriptor = Some(connectorDesc)
-  this.formatDescriptor = formatDesc
-  this.schemaDescriptor = schemaDesc
-  this.statisticsDescriptor = statisticsDesc
-  this.metaDescriptor = metadataDesc
-
-  // expose statistics for external table source util
-  override def getTableStats: Option[TableStats] = super.getTableStats
+    private val isBatch: Boolean,
+    private val isStreaming: Boolean,
+    private val isSource: Boolean,
+    private val isSink: Boolean,
+    private val properties: java.util.Map[String, String])
+  extends TableDescriptor {
+
+  // 
----------------------------------------------------------------------------------------------
+  // Legacy code
+  // 
---------------------------------------------------------------------------------------------
+
+  /**
+    * Reads table statistics from the descriptors properties.
+    *
+    * @deprecated This method exists for backwards-compatibility only.
+    */
+  @Deprecated
+  @deprecated
+  def getTableStats: Option[TableStats] = {
+    val normalizedProps = new DescriptorProperties()
+    addProperties(normalizedProps)
+    val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+    rowCount match {
+      case Some(cnt) =>
+        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
+        Some(TableStats(cnt, columnStats.asJava))
+      case None =>
+        None
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+  // Getters
+  // 
----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns whether this external table is declared as table source.
+    */
+  def isTableSource: Boolean = {
+    isSource
+  }
+
+  /**
+    * Returns whether this external table is declared as table sink.
+    */
+  def isTableSink: Boolean = {
+    isSource
+  }
+
+  /**
+    * Returns whether this external table is intended for batch environments.
+    */
+  def isBatchTable: Boolean = {
+    isBatch
+  }
+
+  /**
+    * Returns whether this external table is intended for stream environments.
+    */
+  def isStreamTable: Boolean = {
+    isStreaming
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(descriptorProperties: 
DescriptorProperties): Unit = {
+    descriptorProperties.putProperties(properties)
+  }
+}
+
+object ExternalCatalogTable {
+
+  /**
+    * Creates a builder for creating an [[ExternalCatalogTable]].
+    *
+    * It takes [[Descriptor]]s which allow for declaring the communication to 
external
+    * systems in an implementation-agnostic way. The classpath is scanned for 
suitable table
+    * factories that match the desired configuration.
+    *
+    * Use the provided builder methods to configure the external catalog table 
accordingly.
+    *
+    * The following example shows how to read from a connector using a JSON 
format and
+    * declaring it as a table source:
+    *
+    * {{{
+    *   ExternalCatalogTable(
+    *     new ExternalSystemXYZ()
+    *       .version("0.11"))
+    *   .withFormat(
+    *     new Json()
+    *       .jsonSchema("{...}")
+    *       .failOnMissingField(false))
+    *   .withSchema(
+    *     new Schema()
+    *       .field("user-name", "VARCHAR").from("u_name")
+    *       .field("count", "DECIMAL")
+    *   .supportsStreaming()
+    *   .asTableSource()
+    * }}}
+    *
+    * @param connectorDescriptor Connector descriptor describing the external 
system
+    * @return External catalog builder
+    */
+  def builder(connectorDescriptor: ConnectorDescriptor): 
ExternalCatalogTableBuilder = {
+    new ExternalCatalogTableBuilder(connectorDescriptor)
+  }
+}
+
+/**
+  * Builder for an [[ExternalCatalogTable]].
+  *
+  * @param connectorDescriptor Connector descriptor describing the external 
system
+  */
+class ExternalCatalogTableBuilder(private val connectorDescriptor: 
ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor[ExternalCatalogTableBuilder]
+  with StreamableDescriptor[ExternalCatalogTableBuilder] {
+
+  private var isBatch: Boolean = true
+  private var isStreaming: Boolean = true
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+  private var statisticsDescriptor: Option[Statistics] = None
+  private var metadataDescriptor: Option[Metadata] = None
+  private var updateMode: Option[String] = None
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  override def withFormat(format: FormatDescriptor): 
ExternalCatalogTableBuilder = {
+    formatDescriptor = Some(format)
+    this
+  }
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  override def withSchema(schema: Schema): ExternalCatalogTableBuilder = {
+    schemaDescriptor = Some(schema)
+    this
+  }
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an 
external connector.
+    *
+    * In append mode, a dynamic table and an external connector only exchange 
INSERT messages.
+    *
+    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+    */
+  override def inAppendMode(): ExternalCatalogTableBuilder = {
+    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
+    this
+  }
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an 
external connector.
+    *
+    * In retract mode, a dynamic table and an external connector exchange ADD 
and RETRACT messages.
+    *
+    * An INSERT change is encoded as an ADD message, a DELETE change as a 
RETRACT message, and an
+    * UPDATE change as a RETRACT message for the updated (previous) row and an 
ADD message for
+    * the updating (new) row.
+    *
+    * In this mode, a key must not be defined as opposed to upsert mode. 
However, every update
+    * consists of two messages which is less efficient.
+    *
+    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+    */
+  override def inRetractMode(): ExternalCatalogTableBuilder = {
+    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
+    this
+  }
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an 
external connector.
+    *
+    * In upsert mode, a dynamic table and an external connector exchange 
UPSERT and DELETE messages.
+    *
+    * This mode requires a (possibly composite) unique key by which updates 
can be propagated. The
+    * external connector needs to be aware of the unique key attribute in 
order to apply messages
+    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
+    * DELETE messages.
+    *
+    * The main difference to a retract stream is that UPDATE changes are 
encoded with a single
+    * message and are therefore more efficient.
+    *
+    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+    */
+  override def inUpsertMode(): ExternalCatalogTableBuilder = {
+    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
+    this
+  }
+
+  /**
+    * Specifies the statistics for this external table.
+    */
+  def withStatistics(statistics: Statistics): ExternalCatalogTableBuilder = {
+    statisticsDescriptor = Some(statistics)
+    this
+  }
+
+  /**
+    * Specifies the metadata for this external table.
+    */
+  def withMetadata(metadata: Metadata): ExternalCatalogTableBuilder = {
+    metadataDescriptor = Some(metadata)
+    this
+  }
+
+  /**
+    * Explicitly declares this external table for supporting only stream 
environments.
+    */
+  def supportsStreaming(): ExternalCatalogTableBuilder = {
+    isBatch = false
+    isStreaming = true
+    this
+  }
+
+  /**
+    * Explicitly declares this external table for supporting only batch 
environments.
+    */
+  def supportsBatch(): ExternalCatalogTableBuilder = {
+    isBatch = false
+    isStreaming = true
+    this
+  }
+
+  /**
+    * Explicitly declares this external table for supporting both batch and 
stream environments.
+    */
+  def supportsBatchAndStreaming(): ExternalCatalogTableBuilder = {
+    isBatch = true
+    isStreaming = true
+    this
+  }
+
+  /**
+    * Declares this external table as a table source and returns the
+    * configured [[ExternalCatalogTable]].
+    *
+    * @return External catalog table
+    */
+  def asTableSource(): ExternalCatalogTable = {
+    new ExternalCatalogTable(
+      isBatch,
+      isStreaming,
+      isSource = true,
+      isSink = false,
+      DescriptorProperties.toJavaMap(this))
+  }
+
+  /**
+    * Declares this external table as a table sink and returns the
+    * configured [[ExternalCatalogTable]].
+    *
+    * @return External catalog table
+    */
+  def asTableSink(): ExternalCatalogTable = {
+    new ExternalCatalogTable(
+      isBatch,
+      isStreaming,
+      isSource = false,
+      isSink = true,
+      DescriptorProperties.toJavaMap(this))
+  }
+
+  /**
+    * Declares this external table as both a table source and sink. It returns 
the
+    * configured [[ExternalCatalogTable]].
+    *
+    * @return External catalog table
+    */
+  def asTableSourceAndSink(): ExternalCatalogTable = {
+    new ExternalCatalogTable(
+      isBatch,
+      isStreaming,
+      isSource = true,
+      isSink = true,
+      DescriptorProperties.toJavaMap(this))
+  }
+
+  // 
----------------------------------------------------------------------------------------------
 
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    connectorDescriptor.addProperties(properties)
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+    statisticsDescriptor.foreach(_.addProperties(properties))
+    metadataDescriptor.foreach(_.addProperties(properties))
+    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
deleted file mode 100644
index 011cbec..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog
-
-import org.apache.flink.table.api._
-import org.apache.flink.table.descriptors.DescriptorProperties
-import org.apache.flink.table.factories.{BatchTableSourceFactory, 
StreamTableSourceFactory, TableFactoryService}
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceSinkTable, TableSourceTable}
-import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.util.Logging
-
-/**
-  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
-  */
-object ExternalTableSourceUtil extends Logging {
-
-  /**
-    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] 
instance
-    *
-    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which 
to convert
-    * @return converted [[TableSourceTable]] instance from the input catalog 
table
-    */
-  def fromExternalCatalogTable(
-      tableEnv: TableEnvironment,
-      externalCatalogTable: ExternalCatalogTable)
-    : TableSourceSinkTable[_, _] = {
-    val properties = new DescriptorProperties()
-    externalCatalogTable.addProperties(properties)
-    val javaMap = properties.asMap
-    tableEnv match {
-      // check for a batch table source in this batch environment
-      case _: BatchTableEnvironment =>
-        val source = TableFactoryService
-          .find(classOf[BatchTableSourceFactory[_]], javaMap)
-          .createBatchTableSource(javaMap)
-        val sourceTable = new BatchTableSourceTable(
-          source,
-          new FlinkStatistic(externalCatalogTable.getTableStats))
-        new TableSourceSinkTable(Some(sourceTable), None)
-
-      // check for a stream table source in this streaming environment
-      case _: StreamTableEnvironment =>
-        val source = TableFactoryService
-          .find(classOf[StreamTableSourceFactory[_]], javaMap)
-          .createStreamTableSource(javaMap)
-        val sourceTable = new StreamTableSourceTable(
-          source,
-          new FlinkStatistic(externalCatalogTable.getTableStats))
-        new TableSourceSinkTable(Some(sourceTable), None)
-
-      case _ => throw new TableException("Unsupported table environment.")
-    }
-  }
-}

Reply via email to