This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b63482  [FLINK-24575][table-runtime] Migrate 
TestRowDataCsvInputFormat to DeserializationSchemaFactory
7b63482 is described below

commit 7b6348251f2c89fcd14b1b965872b1256cbc4807
Author: slinkydeveloper <[email protected]>
AuthorDate: Mon Oct 18 17:30:01 2021 +0200

    [FLINK-24575][table-runtime] Migrate TestRowDataCsvInputFormat to 
DeserializationSchemaFactory
    
    Signed-off-by: slinkydeveloper <[email protected]>
    
    This closes #17515.
---
 .../flink/table/types/utils/TypeConversions.java   |  16 +-
 .../filesystem/TestCsvDeserializationSchema.java   | 151 +++++++++++++++++
 .../filesystem/TestCsvFileSystemFormatFactory.java |  43 +++--
 .../filesystem/TestRowDataCsvInputFormat.java      | 181 ---------------------
 4 files changed, 191 insertions(+), 200 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
index 5bf78af..ff4542f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
@@ -38,7 +38,9 @@ public final class TypeConversions {
 
     /**
      * @deprecated Please don't use this method anymore. It will be removed 
soon and we should not
-     *     make the removal more painful.
+     *     make the removal more painful. Sources and sinks should use the 
method available in
+     *     context to convert, within the planner you should use either {@code 
InternalTypeInfo} or
+     *     {@code ExternalTypeInfo} depending on the use case.
      */
     @Deprecated
     public static DataType fromLegacyInfoToDataType(TypeInformation<?> 
typeInfo) {
@@ -47,7 +49,9 @@ public final class TypeConversions {
 
     /**
      * @deprecated Please don't use this method anymore. It will be removed 
soon and we should not
-     *     make the removal more painful.
+     *     make the removal more painful. Sources and sinks should use the 
method available in
+     *     context to convert, within the planner you should use either {@code 
InternalTypeInfo} or
+     *     {@code ExternalTypeInfo} depending on the use case.
      */
     @Deprecated
     public static DataType[] fromLegacyInfoToDataType(TypeInformation<?>[] 
typeInfo) {
@@ -58,7 +62,9 @@ public final class TypeConversions {
 
     /**
      * @deprecated Please don't use this method anymore. It will be removed 
soon and we should not
-     *     make the removal more painful.
+     *     make the removal more painful. Sources and sinks should use the 
method available in
+     *     context to convert, within the planner you should use either {@code 
InternalTypeInfo} or
+     *     {@code ExternalTypeInfo} depending on the use case.
      */
     @Deprecated
     public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType 
dataType) {
@@ -67,7 +73,9 @@ public final class TypeConversions {
 
     /**
      * @deprecated Please don't use this method anymore. It will be removed 
soon and we should not
-     *     make the removal more painful.
+     *     make the removal more painful. Sources and sinks should use the 
method available in
+     *     context to convert, within the planner you should use either {@code 
InternalTypeInfo} or
+     *     {@code ExternalTypeInfo} depending on the use case.
      */
     @Deprecated
     public static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] 
dataType) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
new file mode 100644
index 0000000..aae8e4c
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+
+/**
+ * The {@link DeserializationSchema} that output {@link RowData}.
+ *
+ * <p>NOTE: This is meant only for testing purpose and doesn't provide a 
feature complete stable csv
+ * parser! If you need a feature complete CSV parser, check out the flink-csv 
package.
+ */
+public class TestCsvDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+    @SuppressWarnings("rawtypes")
+    private final DataStructureConverter[] csvRowToRowDataConverters;
+
+    private final TypeInformation<RowData> typeInfo;
+    private final int fieldCount;
+    private final List<DataType> fieldTypes;
+
+    private transient FieldParser<?>[] fieldParsers;
+
+    public TestCsvDeserializationSchema(DataType dataType) {
+        this.fieldTypes = DataType.getFieldDataTypes(dataType);
+        this.fieldCount = fieldTypes.size();
+
+        this.csvRowToRowDataConverters =
+                fieldTypes.stream()
+                        .map(DataStructureConverters::getConverter)
+                        .toArray(DataStructureConverter[]::new);
+
+        this.typeInfo = InternalTypeInfo.of((RowType) 
dataType.getLogicalType());
+
+        initFieldParsers();
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        initFieldParsers();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public RowData deserialize(byte[] message) throws IOException {
+        GenericRowData row = new GenericRowData(fieldCount);
+        int startIndex = 0;
+        for (int i = 0; i < fieldCount; i++) {
+            startIndex =
+                    this.fieldParsers[i].resetErrorStateAndParse(
+                            message, startIndex, message.length, new byte[] 
{','}, null);
+            row.setField(
+                    i, 
csvRowToRowDataConverters[i].toInternal(fieldParsers[i].getLastResult()));
+        }
+        return row;
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return typeInfo;
+    }
+
+    private void initFieldParsers() {
+        this.fieldParsers = new FieldParser<?>[fieldCount];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            DataType fieldType = fieldTypes.get(i);
+            Class<? extends FieldParser<?>> parserType =
+                    FieldParser.getParserForType(
+                            logicalTypeRootToFieldParserClass(
+                                    fieldType.getLogicalType().getTypeRoot()));
+            if (parserType == null) {
+                throw new RuntimeException("No parser available for type '" + 
fieldType + "'.");
+            }
+
+            FieldParser<?> p = InstantiationUtil.instantiate(parserType, 
FieldParser.class);
+
+            this.fieldParsers[i] = p;
+        }
+    }
+
+    private Class<?> logicalTypeRootToFieldParserClass(LogicalTypeRoot root) {
+        switch (root) {
+            case CHAR:
+            case VARCHAR:
+                return String.class;
+            case BOOLEAN:
+                return Boolean.class;
+            case DECIMAL:
+                return BigDecimal.class;
+            case TINYINT:
+                return Byte.class;
+            case SMALLINT:
+                return Short.class;
+            case INTEGER:
+                return Integer.class;
+            case BIGINT:
+                return Long.class;
+            case FLOAT:
+                return Float.class;
+            case DOUBLE:
+                return Double.class;
+            case DATE:
+                return java.sql.Date.class;
+            case TIME_WITHOUT_TIME_ZONE:
+                return java.sql.Time.class;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return java.sql.Timestamp.class;
+            default:
+                throw new RuntimeException(
+                        "The provided type " + root + " is not supported by 
the testcsv format");
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
index 8f8cd34..5595b87 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
@@ -18,18 +18,20 @@
 
 package org.apache.flink.table.filesystem;
 
-import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.FileSystemFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -46,9 +48,14 @@ import java.util.Set;
 import static 
org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
 import static 
org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
 
-/** Test csv {@link FileSystemFormatFactory}. */
+/**
+ * Factory for csv test format.
+ *
+ * <p>NOTE: This is meant only for testing purpose and doesn't provide a 
feature complete stable csv
+ * parser! If you need a feature complete CSV parser, check out the flink-csv 
package.
+ */
 public class TestCsvFileSystemFormatFactory
-        implements FileSystemFormatFactory, BulkWriterFormatFactory {
+        implements DeserializationFormatFactory, BulkWriterFormatFactory {
 
     @Override
     public String factoryIdentifier() {
@@ -65,17 +72,6 @@ public class TestCsvFileSystemFormatFactory
         return new HashSet<>();
     }
 
-    @Override
-    public InputFormat<RowData, ?> createReader(ReaderContext context) {
-        return new TestRowDataCsvInputFormat(
-                context.getPaths(),
-                context.getSchema(),
-                context.getPartitionKeys(),
-                context.getDefaultPartName(),
-                context.getProjectFields(),
-                context.getPushedDownLimit());
-    }
-
     private static void writeCsvToStream(DataType[] types, RowData rowData, 
OutputStream stream)
             throws IOException {
         LogicalType[] fieldTypes =
@@ -119,6 +115,23 @@ public class TestCsvFileSystemFormatFactory
         };
     }
 
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+        return new DecodingFormat<DeserializationSchema<RowData>>() {
+            @Override
+            public DeserializationSchema<RowData> createRuntimeDecoder(
+                    DynamicTableSource.Context context, DataType 
physicalDataType) {
+                return new TestCsvDeserializationSchema(physicalDataType);
+            }
+
+            @Override
+            public ChangelogMode getChangelogMode() {
+                return ChangelogMode.insertOnly();
+            }
+        };
+    }
+
     private static class CsvBulkWriter implements BulkWriter<RowData> {
 
         private final DataType[] types;
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
deleted file mode 100644
index 49699ec..0000000
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.filesystem;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.RowCsvInputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.utils.PartitionPathUtils;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
-
-/** The {@link InputFormat} that output {@link RowData}. */
-public class TestRowDataCsvInputFormat extends FileInputFormat<RowData> {
-
-    private final List<String> partitionKeys;
-    private final String defaultPartValue;
-    private final int[] selectFields;
-    private final long limit;
-    private final RowCsvInputFormat inputFormat;
-    private final List<DataType> fieldTypes;
-    private final List<String> fieldNames;
-    private final List<DataFormatConverters.DataFormatConverter> 
csvSelectConverters;
-    private final int[] csvFieldMapping;
-
-    private transient Row csvRow;
-    private transient GenericRowData row;
-    private transient long emitted;
-
-    public TestRowDataCsvInputFormat(
-            Path[] paths,
-            TableSchema schema,
-            List<String> partitionKeys,
-            String defaultPartValue,
-            int[] selectFields,
-            long limit) {
-        this.partitionKeys = partitionKeys;
-        this.defaultPartValue = defaultPartValue;
-        this.selectFields = selectFields;
-        this.limit = limit;
-        this.fieldTypes = Arrays.asList(schema.getFieldDataTypes());
-        this.fieldNames = Arrays.asList(schema.getFieldNames());
-
-        List<String> csvFieldNames =
-                fieldNames.stream()
-                        .filter(name -> !partitionKeys.contains(name))
-                        .collect(Collectors.toList());
-
-        List<String> selectFieldNames =
-                
Arrays.stream(selectFields).mapToObj(fieldNames::get).collect(Collectors.toList());
-        List<String> csvSelectFieldNames =
-                selectFieldNames.stream()
-                        .filter(name -> !partitionKeys.contains(name))
-                        .collect(Collectors.toList());
-        List<DataType> csvSelectTypes =
-                csvSelectFieldNames.stream()
-                        .map(name -> fieldTypes.get(fieldNames.indexOf(name)))
-                        .collect(Collectors.toList());
-        RowTypeInfo rowType = (RowTypeInfo) schema.toRowType();
-        TypeInformation<?>[] fieldTypeInfos = rowType.getFieldTypes();
-        TypeInformation<?>[] csvSelectTypeInfos =
-                csvSelectFieldNames.stream()
-                        .map(name -> fieldTypeInfos[fieldNames.indexOf(name)])
-                        .toArray(TypeInformation<?>[]::new);
-        this.csvSelectConverters =
-                csvSelectTypes.stream()
-                        .map(DataFormatConverters::getConverterForDataType)
-                        .collect(Collectors.toList());
-        int[] csvSelectFields =
-                
csvSelectFieldNames.stream().mapToInt(csvFieldNames::indexOf).toArray();
-        this.inputFormat = new RowCsvInputFormat(null, csvSelectTypeInfos, 
csvSelectFields);
-        this.inputFormat.setFilePaths(paths);
-
-        this.csvFieldMapping =
-                
csvSelectFieldNames.stream().mapToInt(selectFieldNames::indexOf).toArray();
-        this.emitted = 0;
-    }
-
-    @Override
-    public void configure(Configuration parameters) {
-        inputFormat.configure(parameters);
-    }
-
-    @Override
-    public FileInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
-        return inputFormat.createInputSplits(minNumSplits);
-    }
-
-    @Override
-    public void open(FileInputSplit split) throws IOException {
-        inputFormat.open(split);
-        Path path = split.getPath();
-        LinkedHashMap<String, String> partSpec =
-                PartitionPathUtils.extractPartitionSpecFromPath(path);
-        this.row = new GenericRowData(selectFields.length);
-        for (int i = 0; i < selectFields.length; i++) {
-            int selectField = selectFields[i];
-            String name = fieldNames.get(selectField);
-            if (partitionKeys.contains(name)) {
-                String value = partSpec.get(name);
-                value = defaultPartValue.equals(value) ? null : value;
-                this.row.setField(i, convertStringToInternal(value, 
fieldTypes.get(selectField)));
-            }
-        }
-        this.csvRow = new Row(csvSelectConverters.size());
-    }
-
-    private Object convertStringToInternal(String value, DataType dataType) {
-        final LogicalType logicalType = dataType.getLogicalType();
-        if (hasRoot(logicalType, LogicalTypeRoot.INTEGER)) {
-            return Integer.parseInt(value);
-        } else if (hasRoot(logicalType, LogicalTypeRoot.BIGINT)) {
-            return Long.parseLong(value);
-        } else if (hasRoot(logicalType, LogicalTypeRoot.CHAR)
-                || hasRoot(logicalType, LogicalTypeRoot.VARCHAR)) {
-            return StringData.fromString(value);
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unsupported partition type: " + 
logicalType.getTypeRoot().name());
-        }
-    }
-
-    @Override
-    public boolean reachedEnd() {
-        return emitted >= limit || inputFormat.reachedEnd();
-    }
-
-    @Override
-    public RowData nextRecord(RowData reuse) throws IOException {
-        Row csvRow = inputFormat.nextRecord(this.csvRow);
-        if (csvRow == null) {
-            return null;
-        }
-        for (int i = 0; i < csvSelectConverters.size(); i++) {
-            row.setField(
-                    csvFieldMapping[i], 
csvSelectConverters.get(i).toInternal(csvRow.getField(i)));
-        }
-        emitted++;
-        return row;
-    }
-
-    @Override
-    public void close() throws IOException {
-        inputFormat.close();
-    }
-}

Reply via email to