This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d9e4bd9  [FLINK-13404][table-api] Port csv descriptors & factories to 
flink-table-api-java-bridge
d9e4bd9 is described below

commit d9e4bd9c1c9de7cabb00de281b6efe4bc3ae8ea1
Author: godfreyhe <[email protected]>
AuthorDate: Wed Jul 24 20:07:26 2019 +0800

    [FLINK-13404][table-api] Port csv descriptors & factories to 
flink-table-api-java-bridge
    
    This closes #9219
---
 flink-formats/flink-csv/pom.xml                    |  14 --
 .../org/apache/flink/table/descriptors/OldCsv.java | 195 ++++++++++++++++++++
 .../flink/table/descriptors/OldCsvValidator.java   |  54 ++++++
 .../table/sinks/CsvAppendTableSinkFactory.java     |  48 +++++
 .../table/sinks/CsvBatchTableSinkFactory.java}     |  27 ++-
 .../flink/table/sinks/CsvTableSinkFactoryBase.java | 106 +++++++++++
 .../table/sources/CsvAppendTableSourceFactory.java |  49 +++++
 .../table/sources/CsvBatchTableSourceFactory.java} |  27 ++-
 .../table/sources/CsvTableSourceFactoryBase.java   | 136 ++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |   4 +-
 .../org.apache.flink.table.factories.TableFactory  |   4 -
 .../apache/flink/table/descriptors/OldCsv.scala    | 200 ---------------------
 .../flink/table/descriptors/OldCsvValidator.scala  |  57 ------
 .../table/sinks/CsvAppendTableSinkFactory.scala    |  45 -----
 .../table/sinks/CsvTableSinkFactoryBase.scala      |  93 ----------
 .../sources/CsvAppendTableSourceFactory.scala      |  45 -----
 .../table/sources/CsvTableSourceFactoryBase.scala  | 120 -------------
 .../flink/table/descriptors/OldCsvTest.scala       |   4 +-
 .../table/descriptors/TableDescriptorTest.scala    |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |   6 +-
 20 files changed, 621 insertions(+), 615 deletions(-)

diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml
index 2c067aa..77fb461 100644
--- a/flink-formats/flink-csv/pom.xml
+++ b/flink-formats/flink-csv/pom.xml
@@ -69,20 +69,6 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
-               <!-- TODO This could be dropped if we change 
CsvRowFormatFactoryTest -->
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <!-- TODO This could be dropped if we change 
CsvRowFormatFactoryTest -->
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-compiler</artifactId>
-                       <scope>test</scope>
-               </dependency>
        </dependencies>
 
        <profiles>
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
new file mode 100644
index 0000000..f7cdc9f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.TypeStringUtils;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_COMMENT_PREFIX;
+import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_FIRST_LINE;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_PARSE_ERRORS;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_LINE_DELIMITER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_QUOTE_CHARACTER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
+
+/**
+ * Format descriptor for comma-separated values (CSV).
+ *
+ * <p>Note: This descriptor describes Flink's non-standard CSV table 
source/sink. In the future, the
+ * descriptor will be replaced by a proper RFC-compliant version. Use the 
RFC-compliant {@code Csv}
+ * format in the dedicated flink-formats/flink-csv module instead when writing 
to Kafka.
+ * Use the old one for stream/batch filesystem operations for now.
+ *
+ * @deprecated Use the RFC-compliant {@code Csv} format in the dedicated
+ *             flink-formats/flink-csv module instead when writing to Kafka.
+ */
+@Deprecated
+@PublicEvolving
+public class OldCsv extends FormatDescriptor {
+
+       private Optional<String> fieldDelim = Optional.empty();
+       private Optional<String> lineDelim = Optional.empty();
+       private Map<String, String> schema = new LinkedHashMap<>();
+       private Optional<Character> quoteCharacter = Optional.empty();
+       private Optional<String> commentPrefix = Optional.empty();
+       private Optional<Boolean> isIgnoreFirstLine = Optional.empty();
+       private Optional<Boolean> lenient = Optional.empty();
+
+       public OldCsv() {
+               super(FORMAT_TYPE_VALUE, 1);
+       }
+
+       /**
+        * Sets the field delimiter, "," by default.
+        *
+        * @param delim the field delimiter
+        */
+       public OldCsv fieldDelimiter(String delim) {
+               this.fieldDelim = Optional.of(delim);
+               return this;
+       }
+
+       /**
+        * Sets the line delimiter, "\n" by default.
+        *
+        * @param delim the line delimiter
+        */
+       public OldCsv lineDelimiter(String delim) {
+               this.lineDelim = Optional.of(delim);
+               return this;
+       }
+
+       /**
+        * Sets the format schema with field names and the types. Required.
+        * The table schema must not contain nested fields.
+        *
+        * <p>This method overwrites existing fields added with [[field()]].
+        *
+        * @param schema the table schema
+        */
+       public OldCsv schema(TableSchema schema) {
+               this.schema.clear();
+               for (int i = 0; i < schema.getFieldCount(); ++i) {
+                       field(schema.getFieldNames()[i], 
schema.getFieldTypes()[i]);
+               }
+               return this;
+       }
+
+       /**
+        * Adds a format field with the field name and the type information. 
Required.
+        * This method can be called multiple times. The call order of this 
method defines
+        * also the order of the fields in the format.
+        *
+        * @param fieldName the field name
+        * @param fieldType the type information of the field
+        */
+       public OldCsv field(String fieldName, TypeInformation<?> fieldType) {
+               field(fieldName, TypeStringUtils.writeTypeInfo(fieldType));
+               return this;
+       }
+
+       /**
+        * Adds a format field with the field name and the type string. 
Required.
+        * This method can be called multiple times. The call order of this 
method defines
+        * also the order of the fields in the format.
+        *
+        * @param fieldName the field name
+        * @param fieldType the type string of the field
+        */
+       public OldCsv field(String fieldName, String fieldType) {
+               if (schema.containsKey(fieldName)) {
+                       throw new ValidationException("Duplicate field name " + 
fieldName + ".");
+               }
+               schema.put(fieldName, fieldType);
+               return this;
+       }
+
+       /**
+        * Sets a quote character for String values, null by default.
+        *
+        * @param quote the quote character
+        */
+       public OldCsv quoteCharacter(Character quote) {
+               this.quoteCharacter = Optional.of(quote);
+               return this;
+       }
+
+       /**
+        * Sets a prefix to indicate comments, null by default.
+        *
+        * @param prefix the prefix to indicate comments
+        */
+       public OldCsv commentPrefix(String prefix) {
+               this.commentPrefix = Optional.of(prefix);
+               return this;
+       }
+
+       /**
+        * Ignore the first line. Not skip the first line by default.
+        */
+       public OldCsv ignoreFirstLine() {
+               this.isIgnoreFirstLine = Optional.of(true);
+               return this;
+       }
+
+       /**
+        * Skip records with parse error instead to fail. Throw an exception by 
default.
+        */
+       public OldCsv ignoreParseErrors() {
+               this.lenient = Optional.of(true);
+               return this;
+       }
+
+       @Override
+       protected Map<String, String> toFormatProperties() {
+               DescriptorProperties properties = new DescriptorProperties();
+
+               fieldDelim.ifPresent(s -> 
properties.putString(FORMAT_FIELD_DELIMITER, s));
+               lineDelim.ifPresent(s -> 
properties.putString(FORMAT_LINE_DELIMITER, s));
+
+               List<String> subKeys = Arrays.asList(
+                               DescriptorProperties.TABLE_SCHEMA_NAME,
+                               DescriptorProperties.TABLE_SCHEMA_TYPE);
+
+               List<List<String>> subValues = schema.entrySet().stream()
+                               .map(e -> Arrays.asList(e.getKey(), 
e.getValue()))
+                               .collect(Collectors.toList());
+
+               properties.putIndexedFixedProperties(FORMAT_FIELDS, subKeys, 
subValues);
+
+               quoteCharacter.ifPresent(character -> 
properties.putCharacter(FORMAT_QUOTE_CHARACTER, character));
+               commentPrefix.ifPresent(s -> 
properties.putString(FORMAT_COMMENT_PREFIX, s));
+               isIgnoreFirstLine.ifPresent(aBoolean -> 
properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, aBoolean));
+               lenient.ifPresent(aBoolean -> 
properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, aBoolean));
+
+               return properties.asMap();
+       }
+}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsvValidator.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsvValidator.java
new file mode 100644
index 0000000..1264494
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsvValidator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Validator for {@link OldCsv}.
+ *
+ * @deprecated Use the RFC-compliant {@code Csv} format in the dedicated
+ *             flink-formats/flink-csv module instead.
+ */
+@Deprecated
+@Internal
+public class OldCsvValidator extends FormatDescriptorValidator {
+
+       public static final String FORMAT_TYPE_VALUE = "csv";
+       public static final String FORMAT_FIELD_DELIMITER = 
"format.field-delimiter";
+       public static final String FORMAT_LINE_DELIMITER = 
"format.line-delimiter";
+       public static final String FORMAT_QUOTE_CHARACTER = 
"format.quote-character";
+       public static final String FORMAT_COMMENT_PREFIX = 
"format.comment-prefix";
+       public static final String FORMAT_IGNORE_FIRST_LINE = 
"format.ignore-first-line";
+       public static final String FORMAT_IGNORE_PARSE_ERRORS = 
"format.ignore-parse-errors";
+       public static final String FORMAT_FIELDS = "format.fields";
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               super.validate(properties);
+               properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, false);
+               properties.validateString(FORMAT_FIELD_DELIMITER, true, 1);
+               properties.validateString(FORMAT_LINE_DELIMITER, true, 1);
+               properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1);
+               properties.validateString(FORMAT_COMMENT_PREFIX, true, 1);
+               properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true);
+               properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
+               properties.validateTableSchema(FORMAT_FIELDS, false);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.java
new file mode 100644
index 0000000..968eb9c
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory base for creating configured instances of {@link CsvTableSink} in a 
stream environment.
+ */
+@PublicEvolving
+public class CsvAppendTableSinkFactory extends CsvTableSinkFactoryBase 
implements StreamTableSinkFactory<Row> {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new 
HashMap<>(super.requiredContext());
+               context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
+               return context;
+       }
+
+       @Override
+       public StreamTableSink<Row> createStreamTableSink(Map<String, String> 
properties) {
+               return createTableSink(true, properties);
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
similarity index 58%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
rename to 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
index 2687ed2..19a9a21 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
@@ -16,23 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sinks
+package org.apache.flink.table.sinks;
 
-import java.util
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.BatchTableSinkFactory;
+import org.apache.flink.types.Row;
 
-import org.apache.flink.table.factories.BatchTableSinkFactory
-import org.apache.flink.types.Row
+import java.util.Map;
 
 /**
-  * Factory base for creating configured instances of [[CsvTableSink]] in a 
batch environment.
-  */
-class CsvBatchTableSinkFactory
-  extends CsvTableSinkFactoryBase
-  with BatchTableSinkFactory[Row] {
+ * Factory base for creating configured instances of {@link CsvTableSink} in a 
batch environment.
+ */
+@PublicEvolving
+public class CsvBatchTableSinkFactory extends CsvTableSinkFactoryBase 
implements BatchTableSinkFactory<Row> {
 
-  override def createBatchTableSink(
-      properties: util.Map[String, String])
-    : BatchTableSink[Row] = {
-    createTableSink(isStreaming = false, properties)
-  }
+       @Override
+       public BatchTableSink<Row> createBatchTableSink(Map<String, String> 
properties) {
+               return createTableSink(false, properties);
+       }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
new file mode 100644
index 0000000..9c83b4a
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FileSystemValidator;
+import org.apache.flink.table.descriptors.OldCsvValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.TableFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
+import static 
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * Factory base for creating configured instances of {@link CsvTableSink}.
+ */
+@Internal
+public abstract class CsvTableSinkFactoryBase implements TableFactory {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE);
+               context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE);
+               context.put(CONNECTOR_PROPERTY_VERSION, "1");
+               context.put(FORMAT_PROPERTY_VERSION, "1");
+               return context;
+       }
+
+       public List<String> supportedProperties() {
+               List<String> properties = new ArrayList<>();
+               // connector
+               properties.add(CONNECTOR_PATH);
+               // format
+               properties.add(FORMAT_FIELDS + ".#." + 
DescriptorProperties.TABLE_SCHEMA_TYPE);
+               properties.add(FORMAT_FIELDS + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+               properties.add(FORMAT_FIELD_DELIMITER);
+               properties.add(CONNECTOR_PATH);
+               // schema
+               properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_TYPE);
+               properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+               return properties;
+       }
+
+       protected CsvTableSink createTableSink(
+                       Boolean isStreaming,
+                       Map<String, String> properties) {
+
+               DescriptorProperties params = new DescriptorProperties();
+               params.putProperties(properties);
+
+               // validate
+               new FileSystemValidator().validate(params);
+               new OldCsvValidator().validate(params);
+               new SchemaValidator(isStreaming, false, false).validate(params);
+
+               // build
+               TableSchema formatSchema = params.getTableSchema(FORMAT_FIELDS);
+               TableSchema tableSchema = params.getTableSchema(SCHEMA);
+
+               if (!formatSchema.equals(tableSchema)) {
+                       throw new TableException(
+                                       "Encodings that differ from the schema 
are not supported yet for CsvTableSink.");
+               }
+
+               String path = params.getString(CONNECTOR_PATH);
+               String fieldDelimiter = 
params.getOptionalString(FORMAT_FIELD_DELIMITER).orElse(",");
+
+               CsvTableSink csvTableSink = new CsvTableSink(path, 
fieldDelimiter);
+
+               return (CsvTableSink) 
csvTableSink.configure(formatSchema.getFieldNames(), 
formatSchema.getFieldTypes());
+       }
+
+}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvAppendTableSourceFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvAppendTableSourceFactory.java
new file mode 100644
index 0000000..6857dd8
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvAppendTableSourceFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory for creating configured instances of {@link CsvTableSource} in a 
stream environment.
+ */
+@PublicEvolving
+public class CsvAppendTableSourceFactory extends CsvTableSourceFactoryBase 
implements StreamTableSourceFactory<Row> {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new 
HashMap<>(super.requiredContext());
+               context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
+               return context;
+       }
+
+       @Override
+       public StreamTableSource<Row> createStreamTableSource(Map<String, 
String> properties) {
+               return createTableSource(true, properties);
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvBatchTableSourceFactory.java
similarity index 57%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
rename to 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvBatchTableSourceFactory.java
index 9d8fa40..a7a2b4d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvBatchTableSourceFactory.java
@@ -16,23 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.sources;
 
-import java.util
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.BatchTableSourceFactory;
+import org.apache.flink.types.Row;
 
-import org.apache.flink.table.factories.BatchTableSourceFactory
-import org.apache.flink.types.Row
+import java.util.Map;
 
 /**
-  * Factory for creating configured instances of [[CsvTableSource]] in a batch 
environment.
-  */
-class CsvBatchTableSourceFactory
-  extends CsvTableSourceFactoryBase
-  with BatchTableSourceFactory[Row] {
+ * Factory for creating configured instances of {@link CsvTableSource} in a 
batch environment.
+ */
+@PublicEvolving
+public class CsvBatchTableSourceFactory extends CsvTableSourceFactoryBase 
implements BatchTableSourceFactory<Row> {
 
-  override def createBatchTableSource(
-      properties: util.Map[String, String])
-    : BatchTableSource[Row] = {
-    createTableSource(isStreaming = false, properties)
-  }
+       @Override
+       public BatchTableSource<Row> createBatchTableSource(Map<String, String> 
properties) {
+               return createTableSource(false, properties);
+       }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
new file mode 100644
index 0000000..4dc8e40
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FileSystemValidator;
+import org.apache.flink.table.descriptors.OldCsvValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.TableFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
+import static 
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_COMMENT_PREFIX;
+import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_FIRST_LINE;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_PARSE_ERRORS;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_LINE_DELIMITER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_QUOTE_CHARACTER;
+import static 
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * Factory base for creating configured instances of {@link CsvTableSource}.
+ */
+@Internal
+public abstract class CsvTableSourceFactoryBase implements TableFactory {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE);
+               context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE);
+               context.put(CONNECTOR_PROPERTY_VERSION, "1");
+               context.put(FORMAT_PROPERTY_VERSION, "1");
+               return context;
+       }
+
+       public List<String> supportedProperties() {
+               List<String> properties = new ArrayList<>();
+               // connector
+               properties.add(CONNECTOR_PATH);
+               // format
+               properties.add(FORMAT_FIELDS + ".#." + 
DescriptorProperties.TABLE_SCHEMA_TYPE);
+               properties.add(FORMAT_FIELDS + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+               properties.add(FORMAT_FIELD_DELIMITER);
+               properties.add(FORMAT_LINE_DELIMITER);
+               properties.add(FORMAT_QUOTE_CHARACTER);
+               properties.add(FORMAT_COMMENT_PREFIX);
+               properties.add(FORMAT_IGNORE_FIRST_LINE);
+               properties.add(FORMAT_IGNORE_PARSE_ERRORS);
+               properties.add(CONNECTOR_PATH);
+               // schema
+               properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_TYPE);
+               properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+               return properties;
+       }
+
+       protected CsvTableSource createTableSource(
+                       Boolean isStreaming,
+                       Map<String, String> properties) {
+
+               DescriptorProperties params = new DescriptorProperties();
+               params.putProperties(properties);
+
+               // validate
+               new FileSystemValidator().validate(params);
+               new OldCsvValidator().validate(params);
+               new SchemaValidator(isStreaming, false, false).validate(params);
+
+               // build
+               CsvTableSource.Builder csvTableSourceBuilder = new 
CsvTableSource.Builder();
+
+               TableSchema formatSchema = params.getTableSchema(FORMAT_FIELDS);
+               TableSchema tableSchema = params.getTableSchema(SCHEMA);
+
+               // the CsvTableSource needs some rework first
+               // for now the schema must be equal to the encoding
+               if (!formatSchema.equals(tableSchema)) {
+                       throw new TableException(
+                                       "Encodings that differ from the schema 
are not supported yet for CsvTableSources.");
+               }
+
+               
params.getOptionalString(CONNECTOR_PATH).ifPresent(csvTableSourceBuilder::path);
+               
params.getOptionalString(FORMAT_FIELD_DELIMITER).ifPresent(csvTableSourceBuilder::fieldDelimiter);
+               
params.getOptionalString(FORMAT_LINE_DELIMITER).ifPresent(csvTableSourceBuilder::lineDelimiter);
+
+               for (int i = 0; i < formatSchema.getFieldCount(); ++i) {
+                       
csvTableSourceBuilder.field(formatSchema.getFieldNames()[i], 
formatSchema.getFieldTypes()[i]);
+               }
+               
params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER).ifPresent(csvTableSourceBuilder::quoteCharacter);
+               
params.getOptionalString(FORMAT_COMMENT_PREFIX).ifPresent(csvTableSourceBuilder::commentPrefix);
+               
params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE).ifPresent(flag -> {
+                       if (flag) {
+                               csvTableSourceBuilder.ignoreFirstLine();
+                       }
+               });
+
+               
params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).ifPresent(flag -> {
+                       if (flag) {
+                               csvTableSourceBuilder.ignoreParseErrors();
+                       }
+               });
+
+               return csvTableSourceBuilder.build();
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-api-java-bridge/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 85%
copy from 
flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to 
flink-table/flink-table-api-java-bridge/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 6bd2ea3..6b06ee6 100644
--- 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -16,6 +16,4 @@
 org.apache.flink.table.sources.CsvBatchTableSourceFactory
 org.apache.flink.table.sources.CsvAppendTableSourceFactory
 org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
-org.apache.flink.table.planner.StreamPlannerFactory
-org.apache.flink.table.executor.StreamExecutorFactory
+org.apache.flink.table.sinks.CsvAppendTableSinkFactory
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 6bd2ea3..58dbb51 100644
--- 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,9 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.sources.CsvBatchTableSourceFactory
-org.apache.flink.table.sources.CsvAppendTableSourceFactory
-org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
 org.apache.flink.table.planner.StreamPlannerFactory
 org.apache.flink.table.executor.StreamExecutorFactory
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
deleted file mode 100644
index 031e757..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableSchema, ValidationException}
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.utils.TypeStringUtils
-
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-/**
-  * Format descriptor for comma-separated values (CSV).
-  *
-  * Note: This descriptor describes Flink's non-standard CSV table 
source/sink. In the future, the
-  * descriptor will be replaced by a proper RFC-compliant version. Use the 
RFC-compliant `Csv`
-  * format in the dedicated `flink-formats/flink-csv` module instead when 
writing to Kafka. Use the
-  * old one for stream/batch filesystem operations for now.
-  *
-  * @deprecated Use the RFC-compliant `Csv` format in the dedicated
-  *            `flink-formats/flink-csv` module instead when writing to Kafka.
-  */
-@Deprecated
-@deprecated
-class OldCsv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
-
-  private var fieldDelim: Option[String] = None
-  private var lineDelim: Option[String] = None
-  private val schema: mutable.LinkedHashMap[String, String] =
-    mutable.LinkedHashMap[String, String]()
-  private var quoteCharacter: Option[Character] = None
-  private var commentPrefix: Option[String] = None
-  private var isIgnoreFirstLine: Option[Boolean] = None
-  private var lenient: Option[Boolean] = None
-
-  /**
-    * Sets the field delimiter, "," by default.
-    *
-    * @param delim the field delimiter
-    */
-  def fieldDelimiter(delim: String): OldCsv = {
-    this.fieldDelim = Some(delim)
-    this
-  }
-
-  /**
-    * Sets the line delimiter, "\n" by default.
-    *
-    * @param delim the line delimiter
-    */
-  def lineDelimiter(delim: String): OldCsv = {
-    this.lineDelim = Some(delim)
-    this
-  }
-
-  /**
-    * Sets the format schema with field names and the types. Required.
-    * The table schema must not contain nested fields.
-    *
-    * This method overwrites existing fields added with [[field()]].
-    *
-    * @param schema the table schema
-    */
-  def schema(schema: TableSchema): OldCsv = {
-    this.schema.clear()
-    schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
-      field(n, t)
-    }
-    this
-  }
-
-  /**
-    * Adds a format field with the field name and the type information. 
Required.
-    * This method can be called multiple times. The call order of this method 
defines
-    * also the order of the fields in the format.
-    *
-    * @param fieldName the field name
-    * @param fieldType the type information of the field
-    */
-  def field(fieldName: String, fieldType: TypeInformation[_]): OldCsv = {
-    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
-    this
-  }
-
-  /**
-    * Adds a format field with the field name and the type string. Required.
-    * This method can be called multiple times. The call order of this method 
defines
-    * also the order of the fields in the format.
-    *
-    * @param fieldName the field name
-    * @param fieldType the type string of the field
-    */
-  def field(fieldName: String, fieldType: String): OldCsv = {
-    if (schema.contains(fieldName)) {
-      throw new ValidationException(s"Duplicate field name $fieldName.")
-    }
-    schema += (fieldName -> fieldType)
-    this
-  }
-
-  /**
-    * Sets a quote character for String values, null by default.
-    *
-    * @param quote the quote character
-    */
-  def quoteCharacter(quote: Character): OldCsv = {
-    this.quoteCharacter = Option(quote)
-    this
-  }
-
-  /**
-    * Sets a prefix to indicate comments, null by default.
-    *
-    * @param prefix the prefix to indicate comments
-    */
-  def commentPrefix(prefix: String): OldCsv = {
-    this.commentPrefix = Option(prefix)
-    this
-  }
-
-  /**
-    * Ignore the first line. Not skip the first line by default.
-    */
-  def ignoreFirstLine(): OldCsv = {
-    this.isIgnoreFirstLine = Some(true)
-    this
-  }
-
-  /**
-    * Skip records with parse error instead to fail. Throw an exception by 
default.
-    */
-  def ignoreParseErrors(): OldCsv = {
-    this.lenient = Some(true)
-    this
-  }
-
-  override protected def toFormatProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-
-    fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
-    lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
-
-    val subKeys = util.Arrays.asList(
-      DescriptorProperties.TABLE_SCHEMA_NAME,
-      DescriptorProperties.TABLE_SCHEMA_TYPE)
-
-    val subValues = schema.map(e => util.Arrays.asList(e._1, 
e._2)).toList.asJava
-
-    properties.putIndexedFixedProperties(
-      FORMAT_FIELDS,
-      subKeys,
-      subValues)
-    quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
-    commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
-    isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, 
_))
-    lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
-
-    properties.asMap()
-  }
-}
-
-/**
-  * Format descriptor for comma-separated values (CSV).
-  */
-object OldCsv {
-
-  /**
-    * Format descriptor for comma-separated values (CSV).
-    *
-    * Note: This descriptor describes Flink's non-standard CSV table 
source/sink. In the future, the
-    * descriptor will be replaced by a proper RFC-compliant version. Use the 
RFC-compliant `Csv`
-    * format in the dedicated `flink-formats/flink-csv` module instead when 
writing to Kafka. Use
-    * the old one for stream/batch filesystem operations for now.
-    *
-    * @deprecated Use the RFC-compliant `Csv` format in the dedicated
-    *            `flink-formats/flink-csv` module instead when writing to 
Kafka.
-    */
-  @deprecated
-  def apply(): OldCsv = new OldCsv()
-
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala
deleted file mode 100644
index 87c01a2..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
-
-/**
-  * Validator for [[OldCsv]].
-  *
-  * @deprecated Use the RFC-compliant `Csv` format in the dedicated
-  *            `flink-formats/flink-csv` module instead.
-  */
-@Deprecated
-@deprecated
-class OldCsvValidator extends FormatDescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    super.validate(properties)
-    properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, false)
-    properties.validateString(FORMAT_FIELD_DELIMITER, true, 1)
-    properties.validateString(FORMAT_LINE_DELIMITER, true, 1)
-    properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1)
-    properties.validateString(FORMAT_COMMENT_PREFIX, true, 1)
-    properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true)
-    properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true)
-    properties.validateTableSchema(FORMAT_FIELDS, false)
-  }
-}
-
-object OldCsvValidator {
-
-  val FORMAT_TYPE_VALUE = "csv"
-  val FORMAT_FIELD_DELIMITER = "format.field-delimiter"
-  val FORMAT_LINE_DELIMITER = "format.line-delimiter"
-  val FORMAT_QUOTE_CHARACTER = "format.quote-character"
-  val FORMAT_COMMENT_PREFIX = "format.comment-prefix"
-  val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
-  val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
-  val FORMAT_FIELDS = "format.fields"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
deleted file mode 100644
index 65a41bb..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import java.util
-
-import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND}
-import org.apache.flink.table.factories.StreamTableSinkFactory
-import org.apache.flink.types.Row
-
-/**
-  * Factory base for creating configured instances of [[CsvTableSink]] in a 
stream environment.
-  */
-class CsvAppendTableSinkFactory
-  extends CsvTableSinkFactoryBase
-  with StreamTableSinkFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String](super.requiredContext())
-    context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
-    context
-  }
-
-  override def createStreamTableSink(
-      properties: util.Map[String, String])
-    : StreamTableSink[Row] = {
-    createTableSink(isStreaming = true, properties)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
deleted file mode 100644
index 3aa114e..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.descriptors.FileSystemValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.descriptors.Schema._
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.TableFactory
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-
-/**
-  * Factory base for creating configured instances of [[CsvTableSink]].
-  */
-abstract class CsvTableSinkFactoryBase extends TableFactory {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
-    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    // connector
-    properties.add(CONNECTOR_PATH)
-    // format
-    
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
-    
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
-    properties.add(FORMAT_FIELD_DELIMITER)
-    properties.add(CONNECTOR_PATH)
-    // schema
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
-    properties
-  }
-
-  protected def createTableSink(
-      isStreaming: Boolean,
-      properties: util.Map[String, String])
-    : CsvTableSink = {
-
-    val params = new DescriptorProperties()
-    params.putProperties(properties)
-
-    // validate
-    new FileSystemValidator().validate(params)
-    new OldCsvValidator().validate(params)
-    new SchemaValidator(isStreaming, false, false).validate(params)
-
-    // build
-    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
-    val tableSchema = params.getTableSchema(SCHEMA)
-
-    if (!formatSchema.equals(tableSchema)) {
-      throw new TableException(
-        "Encodings that differ from the schema are not supported yet for 
CsvTableSink.")
-    }
-
-    val path = params.getString(CONNECTOR_PATH)
-    val fieldDelimiter = 
toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",")
-
-    val csvTableSink = new CsvTableSink(path, fieldDelimiter)
-
-    csvTableSink
-      .configure(formatSchema.getFieldNames, formatSchema.getFieldTypes)
-      .asInstanceOf[CsvTableSink]
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
deleted file mode 100644
index afbe2ea..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND}
-import org.apache.flink.table.factories.StreamTableSourceFactory
-import org.apache.flink.types.Row
-
-/**
-  * Factory for creating configured instances of [[CsvTableSource]] in a 
stream environment.
-  */
-class CsvAppendTableSourceFactory
-  extends CsvTableSourceFactoryBase
-  with StreamTableSourceFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String](super.requiredContext())
-    context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
-    context
-  }
-
-  override def createStreamTableSource(
-      properties: util.Map[String, String])
-    : StreamTableSource[Row] = {
-    createTableSource(isStreaming = true, properties)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
deleted file mode 100644
index a7a48bb..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION,
 CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, 
CONNECTOR_TYPE_VALUE}
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION,
 FORMAT_TYPE}
-import org.apache.flink.table.descriptors.Schema.SCHEMA
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.TableFactory
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-
-/**
-  * Factory base for creating configured instances of [[CsvTableSource]].
-  */
-abstract class CsvTableSourceFactoryBase extends TableFactory {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
-    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    // connector
-    properties.add(CONNECTOR_PATH)
-    // format
-    
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
-    
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
-    properties.add(FORMAT_FIELD_DELIMITER)
-    properties.add(FORMAT_LINE_DELIMITER)
-    properties.add(FORMAT_QUOTE_CHARACTER)
-    properties.add(FORMAT_COMMENT_PREFIX)
-    properties.add(FORMAT_IGNORE_FIRST_LINE)
-    properties.add(FORMAT_IGNORE_PARSE_ERRORS)
-    properties.add(CONNECTOR_PATH)
-    // schema
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
-    properties
-  }
-
-  protected def createTableSource(
-      isStreaming: Boolean,
-      properties: util.Map[String, String])
-    : CsvTableSource = {
-
-    val params = new DescriptorProperties()
-    params.putProperties(properties)
-
-    // validate
-    new FileSystemValidator().validate(params)
-    new OldCsvValidator().validate(params)
-    new SchemaValidator(isStreaming, false, false).validate(params)
-
-    // build
-    val csvTableSourceBuilder = new CsvTableSource.Builder
-
-    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
-    val tableSchema = params.getTableSchema(SCHEMA)
-
-    // the CsvTableSource needs some rework first
-    // for now the schema must be equal to the encoding
-    if (!formatSchema.equals(tableSchema)) {
-      throw new TableException(
-        "Encodings that differ from the schema are not supported yet for 
CsvTableSources.")
-    }
-
-    toScala(params.getOptionalString(CONNECTOR_PATH))
-      .foreach(csvTableSourceBuilder.path)
-    toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
-      .foreach(csvTableSourceBuilder.fieldDelimiter)
-    toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
-      .foreach(csvTableSourceBuilder.lineDelimiter)
-
-    formatSchema.getFieldNames.zip(formatSchema.getFieldTypes).foreach { case 
(name, tpe) =>
-      csvTableSourceBuilder.field(name, tpe)
-    }
-    toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
-      .foreach(csvTableSourceBuilder.quoteCharacter)
-    toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
-      .foreach(csvTableSourceBuilder.commentPrefix)
-    toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { 
flag =>
-      if (flag) {
-        csvTableSourceBuilder.ignoreFirstLine()
-      }
-    }
-    toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { 
flag =>
-      if (flag) {
-        csvTableSourceBuilder.ignoreParseErrors()
-      }
-    }
-
-    csvTableSourceBuilder.build()
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
index 1193184..720300b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
@@ -50,7 +50,7 @@ class OldCsvTest extends DescriptorTestBase {
   // 
----------------------------------------------------------------------------------------------
 
   override def descriptors(): util.List[Descriptor] = {
-    val desc1 = OldCsv()
+    val desc1 = new OldCsv()
       .field("field1", "STRING")
       .field("field2", Types.SQL_TIMESTAMP)
       .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
@@ -59,7 +59,7 @@ class OldCsvTest extends DescriptorTestBase {
         Array[TypeInformation[_]](Types.INT, Types.STRING)))
       .lineDelimiter("^")
 
-    val desc2 = OldCsv()
+    val desc2 = new OldCsv()
       .schema(new TableSchema(
         Array[String]("test", "row"),
         Array[TypeInformation[_]](Types.INT, Types.STRING)))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index 555a030..eaeb582 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -59,7 +59,7 @@ class TableDescriptorTest extends TableTestBase {
     val connector = new FileSystem()
       .path("/path/to/csv")
 
-    val format = OldCsv()
+    val format = new OldCsv()
       .field("myfield", Types.STRING)
       .field("myfield2", Types.INT)
       .field("myfield3", Types.MAP(Types.STRING, Types.INT))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index b5ada5d..73b427f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -68,7 +68,7 @@ object CommonTestData {
     val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), 
"csv-test1", "tmp")
 
     val connDesc1 = new FileSystem().path(tempFilePath1)
-    val formatDesc1 = OldCsv()
+    val formatDesc1 = new OldCsv()
       .field("a", Types.INT)
       .field("b", Types.LONG)
       .field("c", Types.STRING)
@@ -107,7 +107,7 @@ object CommonTestData {
     val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), 
"csv-test2", "tmp")
 
     val connDesc2 = new FileSystem().path(tempFilePath2)
-    val formatDesc2 = OldCsv()
+    val formatDesc2 = new OldCsv()
       .field("d", Types.INT)
       .field("e", Types.LONG)
       .field("f", Types.INT)
@@ -132,7 +132,7 @@ object CommonTestData {
 
     val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
     val connDesc3 = new FileSystem().path(tempFilePath3)
-    val formatDesc3 = OldCsv()
+    val formatDesc3 = new OldCsv()
       .field("x", Types.INT)
       .field("y", Types.LONG)
       .field("z", Types.STRING)

Reply via email to