This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d9e4bd9 [FLINK-13404][table-api] Port csv descriptors & factories to
flink-table-api-java-bridge
d9e4bd9 is described below
commit d9e4bd9c1c9de7cabb00de281b6efe4bc3ae8ea1
Author: godfreyhe <[email protected]>
AuthorDate: Wed Jul 24 20:07:26 2019 +0800
[FLINK-13404][table-api] Port csv descriptors & factories to
flink-table-api-java-bridge
This closes #9219
---
flink-formats/flink-csv/pom.xml | 14 --
.../org/apache/flink/table/descriptors/OldCsv.java | 195 ++++++++++++++++++++
.../flink/table/descriptors/OldCsvValidator.java | 54 ++++++
.../table/sinks/CsvAppendTableSinkFactory.java | 48 +++++
.../table/sinks/CsvBatchTableSinkFactory.java} | 27 ++-
.../flink/table/sinks/CsvTableSinkFactoryBase.java | 106 +++++++++++
.../table/sources/CsvAppendTableSourceFactory.java | 49 +++++
.../table/sources/CsvBatchTableSourceFactory.java} | 27 ++-
.../table/sources/CsvTableSourceFactoryBase.java | 136 ++++++++++++++
.../org.apache.flink.table.factories.TableFactory | 4 +-
.../org.apache.flink.table.factories.TableFactory | 4 -
.../apache/flink/table/descriptors/OldCsv.scala | 200 ---------------------
.../flink/table/descriptors/OldCsvValidator.scala | 57 ------
.../table/sinks/CsvAppendTableSinkFactory.scala | 45 -----
.../table/sinks/CsvTableSinkFactoryBase.scala | 93 ----------
.../sources/CsvAppendTableSourceFactory.scala | 45 -----
.../table/sources/CsvTableSourceFactoryBase.scala | 120 -------------
.../flink/table/descriptors/OldCsvTest.scala | 4 +-
.../table/descriptors/TableDescriptorTest.scala | 2 +-
.../flink/table/runtime/utils/CommonTestData.scala | 6 +-
20 files changed, 621 insertions(+), 615 deletions(-)
diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml
index 2c067aa..77fb461 100644
--- a/flink-formats/flink-csv/pom.xml
+++ b/flink-formats/flink-csv/pom.xml
@@ -69,20 +69,6 @@ under the License.
<type>test-jar</type>
</dependency>
- <!-- TODO This could be dropped if we change
CsvRowFormatFactoryTest -->
- <dependency>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- TODO This could be dropped if we change
CsvRowFormatFactoryTest -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<profiles>
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
new file mode 100644
index 0000000..f7cdc9f
--- /dev/null
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.TypeStringUtils;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_COMMENT_PREFIX;
+import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_FIRST_LINE;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_PARSE_ERRORS;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_LINE_DELIMITER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_QUOTE_CHARACTER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
+
+/**
+ * Format descriptor for comma-separated values (CSV).
+ *
+ * <p>Note: This descriptor describes Flink's non-standard CSV table
source/sink. In the future, the
+ * descriptor will be replaced by a proper RFC-compliant version. Use the
RFC-compliant {@code Csv}
+ * format in the dedicated flink-formats/flink-csv module instead when writing
to Kafka.
+ * Use the old one for stream/batch filesystem operations for now.
+ *
+ * @deprecated Use the RFC-compliant {@code Csv} format in the dedicated
+ * flink-formats/flink-csv module instead when writing to Kafka.
+ */
+@Deprecated
+@PublicEvolving
+public class OldCsv extends FormatDescriptor {
+
+ private Optional<String> fieldDelim = Optional.empty();
+ private Optional<String> lineDelim = Optional.empty();
+ private Map<String, String> schema = new LinkedHashMap<>();
+ private Optional<Character> quoteCharacter = Optional.empty();
+ private Optional<String> commentPrefix = Optional.empty();
+ private Optional<Boolean> isIgnoreFirstLine = Optional.empty();
+ private Optional<Boolean> lenient = Optional.empty();
+
+ public OldCsv() {
+ super(FORMAT_TYPE_VALUE, 1);
+ }
+
+ /**
+ * Sets the field delimiter, "," by default.
+ *
+ * @param delim the field delimiter
+ */
+ public OldCsv fieldDelimiter(String delim) {
+ this.fieldDelim = Optional.of(delim);
+ return this;
+ }
+
+ /**
+ * Sets the line delimiter, "\n" by default.
+ *
+ * @param delim the line delimiter
+ */
+ public OldCsv lineDelimiter(String delim) {
+ this.lineDelim = Optional.of(delim);
+ return this;
+ }
+
+ /**
+ * Sets the format schema with field names and the types. Required.
+ * The table schema must not contain nested fields.
+ *
+ * <p>This method overwrites existing fields added with [[field()]].
+ *
+ * @param schema the table schema
+ */
+ public OldCsv schema(TableSchema schema) {
+ this.schema.clear();
+ for (int i = 0; i < schema.getFieldCount(); ++i) {
+ field(schema.getFieldNames()[i],
schema.getFieldTypes()[i]);
+ }
+ return this;
+ }
+
+ /**
+ * Adds a format field with the field name and the type information.
Required.
+ * This method can be called multiple times. The call order of this
method defines
+ * also the order of the fields in the format.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type information of the field
+ */
+ public OldCsv field(String fieldName, TypeInformation<?> fieldType) {
+ field(fieldName, TypeStringUtils.writeTypeInfo(fieldType));
+ return this;
+ }
+
+ /**
+ * Adds a format field with the field name and the type string.
Required.
+ * This method can be called multiple times. The call order of this
method defines
+ * also the order of the fields in the format.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type string of the field
+ */
+ public OldCsv field(String fieldName, String fieldType) {
+ if (schema.containsKey(fieldName)) {
+ throw new ValidationException("Duplicate field name " +
fieldName + ".");
+ }
+ schema.put(fieldName, fieldType);
+ return this;
+ }
+
+ /**
+ * Sets a quote character for String values, null by default.
+ *
+ * @param quote the quote character
+ */
+ public OldCsv quoteCharacter(Character quote) {
+ this.quoteCharacter = Optional.of(quote);
+ return this;
+ }
+
+ /**
+ * Sets a prefix to indicate comments, null by default.
+ *
+ * @param prefix the prefix to indicate comments
+ */
+ public OldCsv commentPrefix(String prefix) {
+ this.commentPrefix = Optional.of(prefix);
+ return this;
+ }
+
+ /**
+ * Ignore the first line. Not skip the first line by default.
+ */
+ public OldCsv ignoreFirstLine() {
+ this.isIgnoreFirstLine = Optional.of(true);
+ return this;
+ }
+
+ /**
+ * Skip records with parse error instead to fail. Throw an exception by
default.
+ */
+ public OldCsv ignoreParseErrors() {
+ this.lenient = Optional.of(true);
+ return this;
+ }
+
+ @Override
+ protected Map<String, String> toFormatProperties() {
+ DescriptorProperties properties = new DescriptorProperties();
+
+ fieldDelim.ifPresent(s ->
properties.putString(FORMAT_FIELD_DELIMITER, s));
+ lineDelim.ifPresent(s ->
properties.putString(FORMAT_LINE_DELIMITER, s));
+
+ List<String> subKeys = Arrays.asList(
+ DescriptorProperties.TABLE_SCHEMA_NAME,
+ DescriptorProperties.TABLE_SCHEMA_TYPE);
+
+ List<List<String>> subValues = schema.entrySet().stream()
+ .map(e -> Arrays.asList(e.getKey(),
e.getValue()))
+ .collect(Collectors.toList());
+
+ properties.putIndexedFixedProperties(FORMAT_FIELDS, subKeys,
subValues);
+
+ quoteCharacter.ifPresent(character ->
properties.putCharacter(FORMAT_QUOTE_CHARACTER, character));
+ commentPrefix.ifPresent(s ->
properties.putString(FORMAT_COMMENT_PREFIX, s));
+ isIgnoreFirstLine.ifPresent(aBoolean ->
properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, aBoolean));
+ lenient.ifPresent(aBoolean ->
properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, aBoolean));
+
+ return properties.asMap();
+ }
+}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsvValidator.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsvValidator.java
new file mode 100644
index 0000000..1264494
--- /dev/null
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsvValidator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Validator for {@link OldCsv}.
+ *
+ * @deprecated Use the RFC-compliant {@code Csv} format in the dedicated
+ * flink-formats/flink-csv module instead.
+ */
+@Deprecated
+@Internal
+public class OldCsvValidator extends FormatDescriptorValidator {
+
+ public static final String FORMAT_TYPE_VALUE = "csv";
+ public static final String FORMAT_FIELD_DELIMITER =
"format.field-delimiter";
+ public static final String FORMAT_LINE_DELIMITER =
"format.line-delimiter";
+ public static final String FORMAT_QUOTE_CHARACTER =
"format.quote-character";
+ public static final String FORMAT_COMMENT_PREFIX =
"format.comment-prefix";
+ public static final String FORMAT_IGNORE_FIRST_LINE =
"format.ignore-first-line";
+ public static final String FORMAT_IGNORE_PARSE_ERRORS =
"format.ignore-parse-errors";
+ public static final String FORMAT_FIELDS = "format.fields";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, false);
+ properties.validateString(FORMAT_FIELD_DELIMITER, true, 1);
+ properties.validateString(FORMAT_LINE_DELIMITER, true, 1);
+ properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1);
+ properties.validateString(FORMAT_COMMENT_PREFIX, true, 1);
+ properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true);
+ properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
+ properties.validateTableSchema(FORMAT_FIELDS, false);
+ }
+}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.java
new file mode 100644
index 0000000..968eb9c
--- /dev/null
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory base for creating configured instances of {@link CsvTableSink} in a
stream environment.
+ */
+@PublicEvolving
+public class CsvAppendTableSinkFactory extends CsvTableSinkFactoryBase
implements StreamTableSinkFactory<Row> {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new
HashMap<>(super.requiredContext());
+ context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
+ return context;
+ }
+
+ @Override
+ public StreamTableSink<Row> createStreamTableSink(Map<String, String>
properties) {
+ return createTableSink(true, properties);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
similarity index 58%
rename from
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
rename to
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
index 2687ed2..19a9a21 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
@@ -16,23 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.table.sinks
+package org.apache.flink.table.sinks;
-import java.util
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.BatchTableSinkFactory;
+import org.apache.flink.types.Row;
-import org.apache.flink.table.factories.BatchTableSinkFactory
-import org.apache.flink.types.Row
+import java.util.Map;
/**
- * Factory base for creating configured instances of [[CsvTableSink]] in a
batch environment.
- */
-class CsvBatchTableSinkFactory
- extends CsvTableSinkFactoryBase
- with BatchTableSinkFactory[Row] {
+ * Factory base for creating configured instances of {@link CsvTableSink} in a
batch environment.
+ */
+@PublicEvolving
+public class CsvBatchTableSinkFactory extends CsvTableSinkFactoryBase
implements BatchTableSinkFactory<Row> {
- override def createBatchTableSink(
- properties: util.Map[String, String])
- : BatchTableSink[Row] = {
- createTableSink(isStreaming = false, properties)
- }
+ @Override
+ public BatchTableSink<Row> createBatchTableSink(Map<String, String>
properties) {
+ return createTableSink(false, properties);
+ }
}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
new file mode 100644
index 0000000..9c83b4a
--- /dev/null
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FileSystemValidator;
+import org.apache.flink.table.descriptors.OldCsvValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.TableFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
+import static
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;
+import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
+import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * Factory base for creating configured instances of {@link CsvTableSink}.
+ */
+@Internal
+public abstract class CsvTableSinkFactoryBase implements TableFactory {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE);
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE);
+ context.put(CONNECTOR_PROPERTY_VERSION, "1");
+ context.put(FORMAT_PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+ // connector
+ properties.add(CONNECTOR_PATH);
+ // format
+ properties.add(FORMAT_FIELDS + ".#." +
DescriptorProperties.TABLE_SCHEMA_TYPE);
+ properties.add(FORMAT_FIELDS + ".#." +
DescriptorProperties.TABLE_SCHEMA_NAME);
+ properties.add(FORMAT_FIELD_DELIMITER);
+ properties.add(CONNECTOR_PATH);
+ // schema
+ properties.add(SCHEMA + ".#." +
DescriptorProperties.TABLE_SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." +
DescriptorProperties.TABLE_SCHEMA_NAME);
+ return properties;
+ }
+
+ protected CsvTableSink createTableSink(
+ Boolean isStreaming,
+ Map<String, String> properties) {
+
+ DescriptorProperties params = new DescriptorProperties();
+ params.putProperties(properties);
+
+ // validate
+ new FileSystemValidator().validate(params);
+ new OldCsvValidator().validate(params);
+ new SchemaValidator(isStreaming, false, false).validate(params);
+
+ // build
+ TableSchema formatSchema = params.getTableSchema(FORMAT_FIELDS);
+ TableSchema tableSchema = params.getTableSchema(SCHEMA);
+
+ if (!formatSchema.equals(tableSchema)) {
+ throw new TableException(
+ "Encodings that differ from the schema
are not supported yet for CsvTableSink.");
+ }
+
+ String path = params.getString(CONNECTOR_PATH);
+ String fieldDelimiter =
params.getOptionalString(FORMAT_FIELD_DELIMITER).orElse(",");
+
+ CsvTableSink csvTableSink = new CsvTableSink(path,
fieldDelimiter);
+
+ return (CsvTableSink)
csvTableSink.configure(formatSchema.getFieldNames(),
formatSchema.getFieldTypes());
+ }
+
+}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvAppendTableSourceFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvAppendTableSourceFactory.java
new file mode 100644
index 0000000..6857dd8
--- /dev/null
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvAppendTableSourceFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory for creating configured instances of {@link CsvTableSource} in a
stream environment.
+ */
+@PublicEvolving
+public class CsvAppendTableSourceFactory extends CsvTableSourceFactoryBase
implements StreamTableSourceFactory<Row> {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new
HashMap<>(super.requiredContext());
+ context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
+ return context;
+ }
+
+ @Override
+ public StreamTableSource<Row> createStreamTableSource(Map<String,
String> properties) {
+ return createTableSource(true, properties);
+ }
+
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvBatchTableSourceFactory.java
similarity index 57%
rename from
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
rename to
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvBatchTableSourceFactory.java
index 9d8fa40..a7a2b4d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvBatchTableSourceFactory.java
@@ -16,23 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.table.sources
+package org.apache.flink.table.sources;
-import java.util
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.BatchTableSourceFactory;
+import org.apache.flink.types.Row;
-import org.apache.flink.table.factories.BatchTableSourceFactory
-import org.apache.flink.types.Row
+import java.util.Map;
/**
- * Factory for creating configured instances of [[CsvTableSource]] in a batch
environment.
- */
-class CsvBatchTableSourceFactory
- extends CsvTableSourceFactoryBase
- with BatchTableSourceFactory[Row] {
+ * Factory for creating configured instances of {@link CsvTableSource} in a
batch environment.
+ */
+@PublicEvolving
+public class CsvBatchTableSourceFactory extends CsvTableSourceFactoryBase
implements BatchTableSourceFactory<Row> {
- override def createBatchTableSource(
- properties: util.Map[String, String])
- : BatchTableSource[Row] = {
- createTableSource(isStreaming = false, properties)
- }
+ @Override
+ public BatchTableSource<Row> createBatchTableSource(Map<String, String>
properties) {
+ return createTableSource(false, properties);
+ }
}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
new file mode 100644
index 0000000..4dc8e40
--- /dev/null
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FileSystemValidator;
+import org.apache.flink.table.descriptors.OldCsvValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.TableFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
+import static
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;
+import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
+import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_COMMENT_PREFIX;
+import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_FIRST_LINE;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_IGNORE_PARSE_ERRORS;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_LINE_DELIMITER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_QUOTE_CHARACTER;
+import static
org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * Factory base for creating configured instances of {@link CsvTableSource}.
+ */
+@Internal
+public abstract class CsvTableSourceFactoryBase implements TableFactory {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE);
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE);
+ context.put(CONNECTOR_PROPERTY_VERSION, "1");
+ context.put(FORMAT_PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+ // connector
+ properties.add(CONNECTOR_PATH);
+ // format
+ properties.add(FORMAT_FIELDS + ".#." +
DescriptorProperties.TABLE_SCHEMA_TYPE);
+ properties.add(FORMAT_FIELDS + ".#." +
DescriptorProperties.TABLE_SCHEMA_NAME);
+ properties.add(FORMAT_FIELD_DELIMITER);
+ properties.add(FORMAT_LINE_DELIMITER);
+ properties.add(FORMAT_QUOTE_CHARACTER);
+ properties.add(FORMAT_COMMENT_PREFIX);
+ properties.add(FORMAT_IGNORE_FIRST_LINE);
+ properties.add(FORMAT_IGNORE_PARSE_ERRORS);
+ properties.add(CONNECTOR_PATH);
+ // schema
+ properties.add(SCHEMA + ".#." +
DescriptorProperties.TABLE_SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." +
DescriptorProperties.TABLE_SCHEMA_NAME);
+ return properties;
+ }
+
+ protected CsvTableSource createTableSource(
+ Boolean isStreaming,
+ Map<String, String> properties) {
+
+ DescriptorProperties params = new DescriptorProperties();
+ params.putProperties(properties);
+
+ // validate
+ new FileSystemValidator().validate(params);
+ new OldCsvValidator().validate(params);
+ new SchemaValidator(isStreaming, false, false).validate(params);
+
+ // build
+ CsvTableSource.Builder csvTableSourceBuilder = new
CsvTableSource.Builder();
+
+ TableSchema formatSchema = params.getTableSchema(FORMAT_FIELDS);
+ TableSchema tableSchema = params.getTableSchema(SCHEMA);
+
+ // the CsvTableSource needs some rework first
+ // for now the schema must be equal to the encoding
+ if (!formatSchema.equals(tableSchema)) {
+ throw new TableException(
+ "Encodings that differ from the schema
are not supported yet for CsvTableSources.");
+ }
+
+
params.getOptionalString(CONNECTOR_PATH).ifPresent(csvTableSourceBuilder::path);
+
params.getOptionalString(FORMAT_FIELD_DELIMITER).ifPresent(csvTableSourceBuilder::fieldDelimiter);
+
params.getOptionalString(FORMAT_LINE_DELIMITER).ifPresent(csvTableSourceBuilder::lineDelimiter);
+
+ for (int i = 0; i < formatSchema.getFieldCount(); ++i) {
+
csvTableSourceBuilder.field(formatSchema.getFieldNames()[i],
formatSchema.getFieldTypes()[i]);
+ }
+
params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER).ifPresent(csvTableSourceBuilder::quoteCharacter);
+
params.getOptionalString(FORMAT_COMMENT_PREFIX).ifPresent(csvTableSourceBuilder::commentPrefix);
+
params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE).ifPresent(flag -> {
+ if (flag) {
+ csvTableSourceBuilder.ignoreFirstLine();
+ }
+ });
+
+
params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).ifPresent(flag -> {
+ if (flag) {
+ csvTableSourceBuilder.ignoreParseErrors();
+ }
+ });
+
+ return csvTableSourceBuilder.build();
+ }
+
+}
diff --git
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/flink-table/flink-table-api-java-bridge/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 85%
copy from
flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to
flink-table/flink-table-api-java-bridge/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 6bd2ea3..6b06ee6 100644
---
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++
b/flink-table/flink-table-api-java-bridge/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -16,6 +16,4 @@
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
-org.apache.flink.table.planner.StreamPlannerFactory
-org.apache.flink.table.executor.StreamExecutorFactory
+org.apache.flink.table.sinks.CsvAppendTableSinkFactory
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 6bd2ea3..58dbb51 100644
---
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,9 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.sources.CsvBatchTableSourceFactory
-org.apache.flink.table.sources.CsvAppendTableSourceFactory
-org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
deleted file mode 100644
index 031e757..0000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableSchema, ValidationException}
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.utils.TypeStringUtils
-
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-/**
- * Format descriptor for comma-separated values (CSV).
- *
- * Note: This descriptor describes Flink's non-standard CSV table
source/sink. In the future, the
- * descriptor will be replaced by a proper RFC-compliant version. Use the
RFC-compliant `Csv`
- * format in the dedicated `flink-formats/flink-csv` module instead when
writing to Kafka. Use the
- * old one for stream/batch filesystem operations for now.
- *
- * @deprecated Use the RFC-compliant `Csv` format in the dedicated
- * `flink-formats/flink-csv` module instead when writing to Kafka.
- */
-@Deprecated
-@deprecated
-class OldCsv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
-
- private var fieldDelim: Option[String] = None
- private var lineDelim: Option[String] = None
- private val schema: mutable.LinkedHashMap[String, String] =
- mutable.LinkedHashMap[String, String]()
- private var quoteCharacter: Option[Character] = None
- private var commentPrefix: Option[String] = None
- private var isIgnoreFirstLine: Option[Boolean] = None
- private var lenient: Option[Boolean] = None
-
- /**
- * Sets the field delimiter, "," by default.
- *
- * @param delim the field delimiter
- */
- def fieldDelimiter(delim: String): OldCsv = {
- this.fieldDelim = Some(delim)
- this
- }
-
- /**
- * Sets the line delimiter, "\n" by default.
- *
- * @param delim the line delimiter
- */
- def lineDelimiter(delim: String): OldCsv = {
- this.lineDelim = Some(delim)
- this
- }
-
- /**
- * Sets the format schema with field names and the types. Required.
- * The table schema must not contain nested fields.
- *
- * This method overwrites existing fields added with [[field()]].
- *
- * @param schema the table schema
- */
- def schema(schema: TableSchema): OldCsv = {
- this.schema.clear()
- schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
- field(n, t)
- }
- this
- }
-
- /**
- * Adds a format field with the field name and the type information.
Required.
- * This method can be called multiple times. The call order of this method
defines
- * also the order of the fields in the format.
- *
- * @param fieldName the field name
- * @param fieldType the type information of the field
- */
- def field(fieldName: String, fieldType: TypeInformation[_]): OldCsv = {
- field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
- this
- }
-
- /**
- * Adds a format field with the field name and the type string. Required.
- * This method can be called multiple times. The call order of this method
defines
- * also the order of the fields in the format.
- *
- * @param fieldName the field name
- * @param fieldType the type string of the field
- */
- def field(fieldName: String, fieldType: String): OldCsv = {
- if (schema.contains(fieldName)) {
- throw new ValidationException(s"Duplicate field name $fieldName.")
- }
- schema += (fieldName -> fieldType)
- this
- }
-
- /**
- * Sets a quote character for String values, null by default.
- *
- * @param quote the quote character
- */
- def quoteCharacter(quote: Character): OldCsv = {
- this.quoteCharacter = Option(quote)
- this
- }
-
- /**
- * Sets a prefix to indicate comments, null by default.
- *
- * @param prefix the prefix to indicate comments
- */
- def commentPrefix(prefix: String): OldCsv = {
- this.commentPrefix = Option(prefix)
- this
- }
-
- /**
- * Ignore the first line. Not skip the first line by default.
- */
- def ignoreFirstLine(): OldCsv = {
- this.isIgnoreFirstLine = Some(true)
- this
- }
-
- /**
- * Skip records with parse error instead to fail. Throw an exception by
default.
- */
- def ignoreParseErrors(): OldCsv = {
- this.lenient = Some(true)
- this
- }
-
- override protected def toFormatProperties: util.Map[String, String] = {
- val properties = new DescriptorProperties()
-
- fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
- lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
-
- val subKeys = util.Arrays.asList(
- DescriptorProperties.TABLE_SCHEMA_NAME,
- DescriptorProperties.TABLE_SCHEMA_TYPE)
-
- val subValues = schema.map(e => util.Arrays.asList(e._1,
e._2)).toList.asJava
-
- properties.putIndexedFixedProperties(
- FORMAT_FIELDS,
- subKeys,
- subValues)
- quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
- commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
- isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE,
_))
- lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
-
- properties.asMap()
- }
-}
-
-/**
- * Format descriptor for comma-separated values (CSV).
- */
-object OldCsv {
-
- /**
- * Format descriptor for comma-separated values (CSV).
- *
- * Note: This descriptor describes Flink's non-standard CSV table
source/sink. In the future, the
- * descriptor will be replaced by a proper RFC-compliant version. Use the
RFC-compliant `Csv`
- * format in the dedicated `flink-formats/flink-csv` module instead when
writing to Kafka. Use
- * the old one for stream/batch filesystem operations for now.
- *
- * @deprecated Use the RFC-compliant `Csv` format in the dedicated
- * `flink-formats/flink-csv` module instead when writing to
Kafka.
- */
- @deprecated
- def apply(): OldCsv = new OldCsv()
-
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala
deleted file mode 100644
index 87c01a2..0000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
-
-/**
- * Validator for [[OldCsv]].
- *
- * @deprecated Use the RFC-compliant `Csv` format in the dedicated
- * `flink-formats/flink-csv` module instead.
- */
-@Deprecated
-@deprecated
-class OldCsvValidator extends FormatDescriptorValidator {
-
- override def validate(properties: DescriptorProperties): Unit = {
- super.validate(properties)
- properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, false)
- properties.validateString(FORMAT_FIELD_DELIMITER, true, 1)
- properties.validateString(FORMAT_LINE_DELIMITER, true, 1)
- properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1)
- properties.validateString(FORMAT_COMMENT_PREFIX, true, 1)
- properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true)
- properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true)
- properties.validateTableSchema(FORMAT_FIELDS, false)
- }
-}
-
-object OldCsvValidator {
-
- val FORMAT_TYPE_VALUE = "csv"
- val FORMAT_FIELD_DELIMITER = "format.field-delimiter"
- val FORMAT_LINE_DELIMITER = "format.line-delimiter"
- val FORMAT_QUOTE_CHARACTER = "format.quote-character"
- val FORMAT_COMMENT_PREFIX = "format.comment-prefix"
- val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
- val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
- val FORMAT_FIELDS = "format.fields"
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
deleted file mode 100644
index 65a41bb..0000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import java.util
-
-import
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE,
UPDATE_MODE_VALUE_APPEND}
-import org.apache.flink.table.factories.StreamTableSinkFactory
-import org.apache.flink.types.Row
-
-/**
- * Factory base for creating configured instances of [[CsvTableSink]] in a
stream environment.
- */
-class CsvAppendTableSinkFactory
- extends CsvTableSinkFactoryBase
- with StreamTableSinkFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String](super.requiredContext())
- context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
- context
- }
-
- override def createStreamTableSink(
- properties: util.Map[String, String])
- : StreamTableSink[Row] = {
- createTableSink(isStreaming = true, properties)
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
deleted file mode 100644
index 3aa114e..0000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.descriptors.FileSystemValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.descriptors.Schema._
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.TableFactory
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-
-/**
- * Factory base for creating configured instances of [[CsvTableSink]].
- */
-abstract class CsvTableSinkFactoryBase extends TableFactory {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
- context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- // connector
- properties.add(CONNECTOR_PATH)
- // format
-
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
-
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
- properties.add(FORMAT_FIELD_DELIMITER)
- properties.add(CONNECTOR_PATH)
- // schema
- properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
- properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
- properties
- }
-
- protected def createTableSink(
- isStreaming: Boolean,
- properties: util.Map[String, String])
- : CsvTableSink = {
-
- val params = new DescriptorProperties()
- params.putProperties(properties)
-
- // validate
- new FileSystemValidator().validate(params)
- new OldCsvValidator().validate(params)
- new SchemaValidator(isStreaming, false, false).validate(params)
-
- // build
- val formatSchema = params.getTableSchema(FORMAT_FIELDS)
- val tableSchema = params.getTableSchema(SCHEMA)
-
- if (!formatSchema.equals(tableSchema)) {
- throw new TableException(
- "Encodings that differ from the schema are not supported yet for
CsvTableSink.")
- }
-
- val path = params.getString(CONNECTOR_PATH)
- val fieldDelimiter =
toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",")
-
- val csvTableSink = new CsvTableSink(path, fieldDelimiter)
-
- csvTableSink
- .configure(formatSchema.getFieldNames, formatSchema.getFieldTypes)
- .asInstanceOf[CsvTableSink]
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
deleted file mode 100644
index afbe2ea..0000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE,
UPDATE_MODE_VALUE_APPEND}
-import org.apache.flink.table.factories.StreamTableSourceFactory
-import org.apache.flink.types.Row
-
-/**
- * Factory for creating configured instances of [[CsvTableSource]] in a
stream environment.
- */
-class CsvAppendTableSourceFactory
- extends CsvTableSourceFactoryBase
- with StreamTableSourceFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String](super.requiredContext())
- context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
- context
- }
-
- override def createStreamTableSource(
- properties: util.Map[String, String])
- : StreamTableSource[Row] = {
- createTableSource(isStreaming = true, properties)
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
deleted file mode 100644
index a7a48bb..0000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION,
CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.OldCsvValidator._
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH,
CONNECTOR_TYPE_VALUE}
-import
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION,
FORMAT_TYPE}
-import org.apache.flink.table.descriptors.Schema.SCHEMA
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.TableFactory
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-
-/**
- * Factory base for creating configured instances of [[CsvTableSource]].
- */
-abstract class CsvTableSourceFactoryBase extends TableFactory {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
- context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- // connector
- properties.add(CONNECTOR_PATH)
- // format
-
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
-
properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
- properties.add(FORMAT_FIELD_DELIMITER)
- properties.add(FORMAT_LINE_DELIMITER)
- properties.add(FORMAT_QUOTE_CHARACTER)
- properties.add(FORMAT_COMMENT_PREFIX)
- properties.add(FORMAT_IGNORE_FIRST_LINE)
- properties.add(FORMAT_IGNORE_PARSE_ERRORS)
- properties.add(CONNECTOR_PATH)
- // schema
- properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_TYPE}")
- properties.add(s"$SCHEMA.#.${DescriptorProperties.TABLE_SCHEMA_NAME}")
- properties
- }
-
- protected def createTableSource(
- isStreaming: Boolean,
- properties: util.Map[String, String])
- : CsvTableSource = {
-
- val params = new DescriptorProperties()
- params.putProperties(properties)
-
- // validate
- new FileSystemValidator().validate(params)
- new OldCsvValidator().validate(params)
- new SchemaValidator(isStreaming, false, false).validate(params)
-
- // build
- val csvTableSourceBuilder = new CsvTableSource.Builder
-
- val formatSchema = params.getTableSchema(FORMAT_FIELDS)
- val tableSchema = params.getTableSchema(SCHEMA)
-
- // the CsvTableSource needs some rework first
- // for now the schema must be equal to the encoding
- if (!formatSchema.equals(tableSchema)) {
- throw new TableException(
- "Encodings that differ from the schema are not supported yet for
CsvTableSources.")
- }
-
- toScala(params.getOptionalString(CONNECTOR_PATH))
- .foreach(csvTableSourceBuilder.path)
- toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
- .foreach(csvTableSourceBuilder.fieldDelimiter)
- toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
- .foreach(csvTableSourceBuilder.lineDelimiter)
-
- formatSchema.getFieldNames.zip(formatSchema.getFieldTypes).foreach { case
(name, tpe) =>
- csvTableSourceBuilder.field(name, tpe)
- }
- toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
- .foreach(csvTableSourceBuilder.quoteCharacter)
- toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
- .foreach(csvTableSourceBuilder.commentPrefix)
- toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach {
flag =>
- if (flag) {
- csvTableSourceBuilder.ignoreFirstLine()
- }
- }
- toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach {
flag =>
- if (flag) {
- csvTableSourceBuilder.ignoreParseErrors()
- }
- }
-
- csvTableSourceBuilder.build()
- }
-}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
index 1193184..720300b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala
@@ -50,7 +50,7 @@ class OldCsvTest extends DescriptorTestBase {
//
----------------------------------------------------------------------------------------------
override def descriptors(): util.List[Descriptor] = {
- val desc1 = OldCsv()
+ val desc1 = new OldCsv()
.field("field1", "STRING")
.field("field2", Types.SQL_TIMESTAMP)
.field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
@@ -59,7 +59,7 @@ class OldCsvTest extends DescriptorTestBase {
Array[TypeInformation[_]](Types.INT, Types.STRING)))
.lineDelimiter("^")
- val desc2 = OldCsv()
+ val desc2 = new OldCsv()
.schema(new TableSchema(
Array[String]("test", "row"),
Array[TypeInformation[_]](Types.INT, Types.STRING)))
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index 555a030..eaeb582 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -59,7 +59,7 @@ class TableDescriptorTest extends TableTestBase {
val connector = new FileSystem()
.path("/path/to/csv")
- val format = OldCsv()
+ val format = new OldCsv()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
.field("myfield3", Types.MAP(Types.STRING, Types.INT))
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index b5ada5d..73b427f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -68,7 +68,7 @@ object CommonTestData {
val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"),
"csv-test1", "tmp")
val connDesc1 = new FileSystem().path(tempFilePath1)
- val formatDesc1 = OldCsv()
+ val formatDesc1 = new OldCsv()
.field("a", Types.INT)
.field("b", Types.LONG)
.field("c", Types.STRING)
@@ -107,7 +107,7 @@ object CommonTestData {
val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"),
"csv-test2", "tmp")
val connDesc2 = new FileSystem().path(tempFilePath2)
- val formatDesc2 = OldCsv()
+ val formatDesc2 = new OldCsv()
.field("d", Types.INT)
.field("e", Types.LONG)
.field("f", Types.INT)
@@ -132,7 +132,7 @@ object CommonTestData {
val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
val connDesc3 = new FileSystem().path(tempFilePath3)
- val formatDesc3 = OldCsv()
+ val formatDesc3 = new OldCsv()
.field("x", Types.INT)
.field("y", Types.LONG)
.field("z", Types.STRING)