tsreaper commented on a change in pull request #17520:
URL: https://github.com/apache/flink/pull/17520#discussion_r732707765



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
##########
@@ -20,39 +20,135 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
 
 /** Avro format factory for file system. */
 @Internal
-public class AvroFileFormatFactory implements BulkWriterFormatFactory {
+public class AvroFileFormatFactory implements BulkReaderFormatFactory, 
BulkWriterFormatFactory {
 
     public static final String IDENTIFIER = "avro";
 
+    @Override
+    public BulkDecodingFormat<RowData> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+        return new BulkDecodingFormat<RowData>() {
+            @Override
+            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                    DynamicTableSource.Context sourceContext, DataType 
producedDataType) {
+                RowType physicalRowype =
+                        (RowType)
+                                context.getCatalogTable()
+                                        .getResolvedSchema()
+                                        .toPhysicalRowDataType()
+                                        .getLogicalType();

Review comment:
       Do you mean we should only read the fields which are presented in 
`producedDataType` from avro files and should not read other fields from avro 
files? If yes I shall take a look to determine if this is possible. If not, 
keeping the fields even if they're not in `producedDataType` will cause the 
result of this source to be incorrect. The resulting row must fit 
`producedDataType`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to