slinkydeveloper commented on a change in pull request #17520:
URL: https://github.com/apache/flink/pull/17520#discussion_r732525418



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
##########
@@ -20,39 +20,135 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
 
 /** Avro format factory for file system. */
 @Internal
-public class AvroFileFormatFactory implements BulkWriterFormatFactory {
+public class AvroFileFormatFactory implements BulkReaderFormatFactory, 
BulkWriterFormatFactory {
 
     public static final String IDENTIFIER = "avro";
 
+    @Override
+    public BulkDecodingFormat<RowData> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+        return new BulkDecodingFormat<RowData>() {
+            @Override
+            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                    DynamicTableSource.Context sourceContext, DataType 
producedDataType) {
+                RowType physicalRowype =
+                        (RowType)
+                                context.getCatalogTable()
+                                        .getResolvedSchema()
+                                        .toPhysicalRowDataType()
+                                        .getLogicalType();

Review comment:
       From `FileSystemTableSource`, `producedDataType` type includes:
   
   * projected fields
   * partition fields
   
   For the partition fields, as you sad, we should extract them from the 
`FileSourceSplit`, but then we should not extract and convert the non projected 
fields, we should simply ignore them. Extracting all the fields, and then 
returning only the projected ones, requires you to do a row copy 
https://github.com/apache/flink/pull/17520/files#diff-0f3083989c5687db08869ff72bcba9c9746c2441e40fd1f0cd75e1eab84b16c8R258
 which we should avoid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to