This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7b63482 [FLINK-24575][table-runtime] Migrate
TestRowDataCsvInputFormat to DeserializationSchemaFactory
7b63482 is described below
commit 7b6348251f2c89fcd14b1b965872b1256cbc4807
Author: slinkydeveloper <[email protected]>
AuthorDate: Mon Oct 18 17:30:01 2021 +0200
[FLINK-24575][table-runtime] Migrate TestRowDataCsvInputFormat to
DeserializationSchemaFactory
Signed-off-by: slinkydeveloper <[email protected]>
This closes #17515.
---
.../flink/table/types/utils/TypeConversions.java | 16 +-
.../filesystem/TestCsvDeserializationSchema.java | 151 +++++++++++++++++
.../filesystem/TestCsvFileSystemFormatFactory.java | 43 +++--
.../filesystem/TestRowDataCsvInputFormat.java | 181 ---------------------
4 files changed, 191 insertions(+), 200 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
index 5bf78af..ff4542f 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
@@ -38,7 +38,9 @@ public final class TypeConversions {
/**
* @deprecated Please don't use this method anymore. It will be removed
soon and we should not
- * make the removal more painful.
+ * make the removal more painful. Sources and sinks should use the
method available in
+ * context to convert, within the planner you should use either {@code
InternalTypeInfo} or
+ * {@code ExternalTypeInfo} depending on the use case.
*/
@Deprecated
public static DataType fromLegacyInfoToDataType(TypeInformation<?>
typeInfo) {
@@ -47,7 +49,9 @@ public final class TypeConversions {
/**
* @deprecated Please don't use this method anymore. It will be removed
soon and we should not
- * make the removal more painful.
+ * make the removal more painful. Sources and sinks should use the
method available in
+ * context to convert, within the planner you should use either {@code
InternalTypeInfo} or
+ * {@code ExternalTypeInfo} depending on the use case.
*/
@Deprecated
public static DataType[] fromLegacyInfoToDataType(TypeInformation<?>[]
typeInfo) {
@@ -58,7 +62,9 @@ public final class TypeConversions {
/**
* @deprecated Please don't use this method anymore. It will be removed
soon and we should not
- * make the removal more painful.
+ * make the removal more painful. Sources and sinks should use the
method available in
+ * context to convert, within the planner you should use either {@code
InternalTypeInfo} or
+ * {@code ExternalTypeInfo} depending on the use case.
*/
@Deprecated
public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType
dataType) {
@@ -67,7 +73,9 @@ public final class TypeConversions {
/**
* @deprecated Please don't use this method anymore. It will be removed
soon and we should not
- * make the removal more painful.
+ * make the removal more painful. Sources and sinks should use the
method available in
+ * context to convert, within the planner you should use either {@code
InternalTypeInfo} or
+ * {@code ExternalTypeInfo} depending on the use case.
*/
@Deprecated
public static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[]
dataType) {
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
new file mode 100644
index 0000000..aae8e4c
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+
+/**
+ * The {@link DeserializationSchema} that output {@link RowData}.
+ *
+ * <p>NOTE: This is meant only for testing purpose and doesn't provide a
feature complete stable csv
+ * parser! If you need a feature complete CSV parser, check out the flink-csv
package.
+ */
+public class TestCsvDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ @SuppressWarnings("rawtypes")
+ private final DataStructureConverter[] csvRowToRowDataConverters;
+
+ private final TypeInformation<RowData> typeInfo;
+ private final int fieldCount;
+ private final List<DataType> fieldTypes;
+
+ private transient FieldParser<?>[] fieldParsers;
+
+ public TestCsvDeserializationSchema(DataType dataType) {
+ this.fieldTypes = DataType.getFieldDataTypes(dataType);
+ this.fieldCount = fieldTypes.size();
+
+ this.csvRowToRowDataConverters =
+ fieldTypes.stream()
+ .map(DataStructureConverters::getConverter)
+ .toArray(DataStructureConverter[]::new);
+
+ this.typeInfo = InternalTypeInfo.of((RowType)
dataType.getLogicalType());
+
+ initFieldParsers();
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ initFieldParsers();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public RowData deserialize(byte[] message) throws IOException {
+ GenericRowData row = new GenericRowData(fieldCount);
+ int startIndex = 0;
+ for (int i = 0; i < fieldCount; i++) {
+ startIndex =
+ this.fieldParsers[i].resetErrorStateAndParse(
+ message, startIndex, message.length, new byte[]
{','}, null);
+ row.setField(
+ i,
csvRowToRowDataConverters[i].toInternal(fieldParsers[i].getLastResult()));
+ }
+ return row;
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return typeInfo;
+ }
+
+ private void initFieldParsers() {
+ this.fieldParsers = new FieldParser<?>[fieldCount];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ DataType fieldType = fieldTypes.get(i);
+ Class<? extends FieldParser<?>> parserType =
+ FieldParser.getParserForType(
+ logicalTypeRootToFieldParserClass(
+ fieldType.getLogicalType().getTypeRoot()));
+ if (parserType == null) {
+ throw new RuntimeException("No parser available for type '" +
fieldType + "'.");
+ }
+
+ FieldParser<?> p = InstantiationUtil.instantiate(parserType,
FieldParser.class);
+
+ this.fieldParsers[i] = p;
+ }
+ }
+
+ private Class<?> logicalTypeRootToFieldParserClass(LogicalTypeRoot root) {
+ switch (root) {
+ case CHAR:
+ case VARCHAR:
+ return String.class;
+ case BOOLEAN:
+ return Boolean.class;
+ case DECIMAL:
+ return BigDecimal.class;
+ case TINYINT:
+ return Byte.class;
+ case SMALLINT:
+ return Short.class;
+ case INTEGER:
+ return Integer.class;
+ case BIGINT:
+ return Long.class;
+ case FLOAT:
+ return Float.class;
+ case DOUBLE:
+ return Double.class;
+ case DATE:
+ return java.sql.Date.class;
+ case TIME_WITHOUT_TIME_ZONE:
+ return java.sql.Time.class;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return java.sql.Timestamp.class;
+ default:
+ throw new RuntimeException(
+ "The provided type " + root + " is not supported by
the testcsv format");
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
index 8f8cd34..5595b87 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
@@ -18,18 +18,20 @@
package org.apache.flink.table.filesystem;
-import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -46,9 +48,14 @@ import java.util.Set;
import static
org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
import static
org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
-/** Test csv {@link FileSystemFormatFactory}. */
+/**
+ * Factory for csv test format.
+ *
+ * <p>NOTE: This is meant only for testing purpose and doesn't provide a
feature complete stable csv
+ * parser! If you need a feature complete CSV parser, check out the flink-csv
package.
+ */
public class TestCsvFileSystemFormatFactory
- implements FileSystemFormatFactory, BulkWriterFormatFactory {
+ implements DeserializationFormatFactory, BulkWriterFormatFactory {
@Override
public String factoryIdentifier() {
@@ -65,17 +72,6 @@ public class TestCsvFileSystemFormatFactory
return new HashSet<>();
}
- @Override
- public InputFormat<RowData, ?> createReader(ReaderContext context) {
- return new TestRowDataCsvInputFormat(
- context.getPaths(),
- context.getSchema(),
- context.getPartitionKeys(),
- context.getDefaultPartName(),
- context.getProjectFields(),
- context.getPushedDownLimit());
- }
-
private static void writeCsvToStream(DataType[] types, RowData rowData,
OutputStream stream)
throws IOException {
LogicalType[] fieldTypes =
@@ -119,6 +115,23 @@ public class TestCsvFileSystemFormatFactory
};
}
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions)
{
+ return new DecodingFormat<DeserializationSchema<RowData>>() {
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType
physicalDataType) {
+ return new TestCsvDeserializationSchema(physicalDataType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
private static class CsvBulkWriter implements BulkWriter<RowData> {
private final DataType[] types;
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
deleted file mode 100644
index 49699ec..0000000
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.filesystem;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.RowCsvInputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.utils.PartitionPathUtils;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
-
-/** The {@link InputFormat} that output {@link RowData}. */
-public class TestRowDataCsvInputFormat extends FileInputFormat<RowData> {
-
- private final List<String> partitionKeys;
- private final String defaultPartValue;
- private final int[] selectFields;
- private final long limit;
- private final RowCsvInputFormat inputFormat;
- private final List<DataType> fieldTypes;
- private final List<String> fieldNames;
- private final List<DataFormatConverters.DataFormatConverter>
csvSelectConverters;
- private final int[] csvFieldMapping;
-
- private transient Row csvRow;
- private transient GenericRowData row;
- private transient long emitted;
-
- public TestRowDataCsvInputFormat(
- Path[] paths,
- TableSchema schema,
- List<String> partitionKeys,
- String defaultPartValue,
- int[] selectFields,
- long limit) {
- this.partitionKeys = partitionKeys;
- this.defaultPartValue = defaultPartValue;
- this.selectFields = selectFields;
- this.limit = limit;
- this.fieldTypes = Arrays.asList(schema.getFieldDataTypes());
- this.fieldNames = Arrays.asList(schema.getFieldNames());
-
- List<String> csvFieldNames =
- fieldNames.stream()
- .filter(name -> !partitionKeys.contains(name))
- .collect(Collectors.toList());
-
- List<String> selectFieldNames =
-
Arrays.stream(selectFields).mapToObj(fieldNames::get).collect(Collectors.toList());
- List<String> csvSelectFieldNames =
- selectFieldNames.stream()
- .filter(name -> !partitionKeys.contains(name))
- .collect(Collectors.toList());
- List<DataType> csvSelectTypes =
- csvSelectFieldNames.stream()
- .map(name -> fieldTypes.get(fieldNames.indexOf(name)))
- .collect(Collectors.toList());
- RowTypeInfo rowType = (RowTypeInfo) schema.toRowType();
- TypeInformation<?>[] fieldTypeInfos = rowType.getFieldTypes();
- TypeInformation<?>[] csvSelectTypeInfos =
- csvSelectFieldNames.stream()
- .map(name -> fieldTypeInfos[fieldNames.indexOf(name)])
- .toArray(TypeInformation<?>[]::new);
- this.csvSelectConverters =
- csvSelectTypes.stream()
- .map(DataFormatConverters::getConverterForDataType)
- .collect(Collectors.toList());
- int[] csvSelectFields =
-
csvSelectFieldNames.stream().mapToInt(csvFieldNames::indexOf).toArray();
- this.inputFormat = new RowCsvInputFormat(null, csvSelectTypeInfos,
csvSelectFields);
- this.inputFormat.setFilePaths(paths);
-
- this.csvFieldMapping =
-
csvSelectFieldNames.stream().mapToInt(selectFieldNames::indexOf).toArray();
- this.emitted = 0;
- }
-
- @Override
- public void configure(Configuration parameters) {
- inputFormat.configure(parameters);
- }
-
- @Override
- public FileInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
- return inputFormat.createInputSplits(minNumSplits);
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- inputFormat.open(split);
- Path path = split.getPath();
- LinkedHashMap<String, String> partSpec =
- PartitionPathUtils.extractPartitionSpecFromPath(path);
- this.row = new GenericRowData(selectFields.length);
- for (int i = 0; i < selectFields.length; i++) {
- int selectField = selectFields[i];
- String name = fieldNames.get(selectField);
- if (partitionKeys.contains(name)) {
- String value = partSpec.get(name);
- value = defaultPartValue.equals(value) ? null : value;
- this.row.setField(i, convertStringToInternal(value,
fieldTypes.get(selectField)));
- }
- }
- this.csvRow = new Row(csvSelectConverters.size());
- }
-
- private Object convertStringToInternal(String value, DataType dataType) {
- final LogicalType logicalType = dataType.getLogicalType();
- if (hasRoot(logicalType, LogicalTypeRoot.INTEGER)) {
- return Integer.parseInt(value);
- } else if (hasRoot(logicalType, LogicalTypeRoot.BIGINT)) {
- return Long.parseLong(value);
- } else if (hasRoot(logicalType, LogicalTypeRoot.CHAR)
- || hasRoot(logicalType, LogicalTypeRoot.VARCHAR)) {
- return StringData.fromString(value);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported partition type: " +
logicalType.getTypeRoot().name());
- }
- }
-
- @Override
- public boolean reachedEnd() {
- return emitted >= limit || inputFormat.reachedEnd();
- }
-
- @Override
- public RowData nextRecord(RowData reuse) throws IOException {
- Row csvRow = inputFormat.nextRecord(this.csvRow);
- if (csvRow == null) {
- return null;
- }
- for (int i = 0; i < csvSelectConverters.size(); i++) {
- row.setField(
- csvFieldMapping[i],
csvSelectConverters.get(i).toInternal(csvRow.getField(i)));
- }
- emitted++;
- return row;
- }
-
- @Override
- public void close() throws IOException {
- inputFormat.close();
- }
-}