This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6bb090751093e4f9f8a05c80857af11381f88599 Author: slinkydeveloper <[email protected]> AuthorDate: Wed Nov 24 15:07:12 2021 +0100 [FLINK-24687][table-runtime] Refactored test csv format to be independent of planner (except ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter) and to implement SerializationSchema more than BulkWriterFormatFactory. Moved to a specific package Signed-off-by: slinkydeveloper <[email protected]> --- .../table/filesystem/FileSystemTableSink.java | 5 +- .../testcsv}/TestCsvDeserializationSchema.java | 35 +++++---- .../testcsv/TestCsvFormatFactory.java} | 89 +++++----------------- .../testcsv/TestCsvSerializationSchema.java | 58 ++++++++++++++ .../org.apache.flink.table.factories.Factory | 2 +- 5 files changed, 104 insertions(+), 85 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java index 2e9af35..bbd7425 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java @@ -343,7 +343,10 @@ public class FileSystemTableSink extends AbstractFileSystemTable @Override public DynamicTableSource.DataStructureConverter createDataStructureConverter( DataType producedDataType) { - throw new TableException("Compaction reader not support DataStructure converter."); + // This method cannot be implemented without changing the + // DynamicTableSink.DataStructureConverter interface + throw new UnsupportedOperationException( + "Compaction reader not support DataStructure converter."); } }; } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java similarity index 83% rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java index dbec987..1569e0d 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java @@ -16,24 +16,22 @@ * limitations under the License. */ -package org.apache.flink.table.filesystem; +package org.apache.flink.formats.testcsv; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.conversion.DataStructureConverter; -import org.apache.flink.table.data.conversion.DataStructureConverters; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; import java.math.BigDecimal; import java.util.List; +import java.util.function.Function; /** * The {@link DeserializationSchema} that output {@link RowData}. @@ -41,7 +39,7 @@ import java.util.List; * <p>NOTE: This is meant only for testing purpose and doesn't provide a feature complete stable csv * parser! If you need a feature complete CSV parser, check out the flink-csv package. */ -public class TestCsvDeserializationSchema implements DeserializationSchema<RowData> { +class TestCsvDeserializationSchema implements DeserializationSchema<RowData> { private final List<DataType> physicalFieldTypes; private final int physicalFieldCount; @@ -49,20 +47,34 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa private final TypeInformation<RowData> typeInfo; private final int[] indexMapping; - @SuppressWarnings("rawtypes") - private transient DataStructureConverter[] csvRowToRowDataConverters; + private final DynamicTableSource.DataStructureConverter[] csvRowToRowDataConverters; private transient FieldParser<?>[] fieldParsers; - public TestCsvDeserializationSchema(DataType physicalDataType, List<String> orderedCsvColumns) { + public TestCsvDeserializationSchema( + DataType physicalDataType, + TypeInformation<RowData> typeInfo, + List<String> orderedCsvColumns, + Function<DataType, DynamicTableSource.DataStructureConverter> converterFactory) { this.physicalFieldTypes = DataType.getFieldDataTypes(physicalDataType); this.physicalFieldCount = physicalFieldTypes.size(); - this.typeInfo = InternalTypeInfo.of((RowType) physicalDataType.getLogicalType()); + this.typeInfo = typeInfo; List<String> physicalFieldNames = DataType.getFieldNames(physicalDataType); this.indexMapping = orderedCsvColumns.stream().mapToInt(physicalFieldNames::indexOf).toArray(); + // Init data converters + int csvRowLength = indexMapping.length; + this.csvRowToRowDataConverters = + new DynamicTableSource.DataStructureConverter[csvRowLength]; + for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) { + if (indexMapping[csvColumn] != -1) { + DataType fieldType = physicalFieldTypes.get(indexMapping[csvColumn]); + this.csvRowToRowDataConverters[csvColumn] = converterFactory.apply(fieldType); + } + } + initFieldParsers(); } @@ -103,7 +115,6 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa private void initFieldParsers() { int csvRowLength = indexMapping.length; this.fieldParsers = new FieldParser<?>[csvRowLength]; - this.csvRowToRowDataConverters = new DataStructureConverter[csvRowLength]; for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) { if (indexMapping[csvColumn] == -1) { // The output type doesn't include this field, so just assign a string parser to @@ -125,8 +136,6 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class); this.fieldParsers[csvColumn] = p; - this.csvRowToRowDataConverters[csvColumn] = - DataStructureConverters.getConverter(fieldType); } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java similarity index 54% rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java index 5bae8f5..fa564c4 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java @@ -16,10 +16,10 @@ * limitations under the License. */ -package org.apache.flink.table.filesystem; +package org.apache.flink.formats.testcsv; -import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.connector.ChangelogMode; @@ -30,34 +30,23 @@ import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.util.DataFormatConverters; -import org.apache.flink.table.factories.BulkWriterFormatFactory; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.types.Row; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER; -import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER; - /** * Factory for csv test format. * * <p>NOTE: This is meant only for testing purpose and doesn't provide a feature complete stable csv * parser! If you need a feature complete CSV parser, check out the flink-csv package. */ -public class TestCsvFileSystemFormatFactory - implements DeserializationFormatFactory, BulkWriterFormatFactory { +public class TestCsvFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { @Override public String factoryIdentifier() { @@ -74,40 +63,17 @@ public class TestCsvFileSystemFormatFactory return new HashSet<>(); } - private static void writeCsvToStream(DataType[] types, RowData rowData, OutputStream stream) - throws IOException { - LogicalType[] fieldTypes = - Arrays.stream(types).map(DataType::getLogicalType).toArray(LogicalType[]::new); - DataFormatConverters.DataFormatConverter converter = - DataFormatConverters.getConverterForDataType( - TypeConversions.fromLogicalToDataType(RowType.of(fieldTypes))); - - Row row = (Row) converter.toExternal(rowData); - StringBuilder builder = new StringBuilder(); - Object o; - for (int i = 0; i < row.getArity(); i++) { - if (i > 0) { - builder.append(DEFAULT_FIELD_DELIMITER); - } - if ((o = row.getField(i)) != null) { - builder.append(o); - } - } - String str = builder.toString(); - stream.write(str.getBytes(StandardCharsets.UTF_8)); - stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8)); - } - @Override - public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat( + public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { - return new EncodingFormat<BulkWriter.Factory<RowData>>() { + return new EncodingFormat<SerializationSchema<RowData>>() { @Override - public BulkWriter.Factory<RowData> createRuntimeEncoder( + public SerializationSchema<RowData> createRuntimeEncoder( DynamicTableSink.Context context, DataType consumedDataType) { - return out -> - new CsvBulkWriter( - consumedDataType.getChildren().toArray(new DataType[0]), out); + DynamicTableSink.DataStructureConverter converter = + context.createDataStructureConverter(consumedDataType); + + return new TestCsvSerializationSchema(converter); } @Override @@ -129,7 +95,12 @@ public class TestCsvFileSystemFormatFactory DataType projectedPhysicalDataType = Projection.of(projections).project(physicalDataType); return new TestCsvDeserializationSchema( - projectedPhysicalDataType, DataType.getFieldNames(physicalDataType)); + projectedPhysicalDataType, + context.createTypeInformation(projectedPhysicalDataType), + DataType.getFieldNames(physicalDataType), + // Check out the FileSystemTableSink#createSourceContext for more details on + // why we need this + ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter); } @Override @@ -138,26 +109,4 @@ public class TestCsvFileSystemFormatFactory } }; } - - private static class CsvBulkWriter implements BulkWriter<RowData> { - - private final DataType[] types; - private final OutputStream stream; - - private CsvBulkWriter(DataType[] types, OutputStream stream) { - this.types = types; - this.stream = stream; - } - - @Override - public void addElement(RowData element) throws IOException { - writeCsvToStream(types, element, stream); - } - - @Override - public void flush() {} - - @Override - public void finish() {} - } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java new file mode 100644 index 0000000..9fc6800 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.testcsv; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; + +import java.nio.charset.StandardCharsets; + +import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER; + +/** @see TestCsvFormatFactory */ +class TestCsvSerializationSchema implements SerializationSchema<RowData> { + + private final DynamicTableSink.DataStructureConverter converter; + + public TestCsvSerializationSchema(DynamicTableSink.DataStructureConverter converter) { + this.converter = converter; + } + + @Override + public void open(InitializationContext context) throws Exception {} + + @Override + public byte[] serialize(RowData element) { + Row row = (Row) converter.toExternal(element); + StringBuilder builder = new StringBuilder(); + Object o; + for (int i = 0; i < row.getArity(); i++) { + if (i > 0) { + builder.append(DEFAULT_FIELD_DELIMITER); + } + if ((o = row.getField(i)) != null) { + builder.append(o); + } + } + String str = builder.toString(); + return str.getBytes(StandardCharsets.UTF_8); + } +} diff --git a/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 7487804..0d3371a 100644 --- a/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.table.filesystem.TestCsvFileSystemFormatFactory +org.apache.flink.formats.testcsv.TestCsvFileSystemFormatFactory
