tsreaper commented on code in PR #180:
URL: https://github.com/apache/flink-table-store/pull/180#discussion_r910557571


##########
flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java:
##########
@@ -19,31 +19,161 @@
 package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.avro.AbstractAvroBulkFormat;
+import org.apache.flink.formats.avro.AvroBuilder;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.AvroWriterFactory;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
 
 /** Avro {@link FileFormat}. */
 public class AvroFileFormat extends FileFormat {
 
-    private final org.apache.flink.formats.avro.AvroFileFormatFactory factory;
     private final ReadableConfig formatOptions;
 
     public AvroFileFormat(ReadableConfig formatOptions) {
         super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
-        this.factory = new 
org.apache.flink.formats.avro.AvroFileFormatFactory();
         this.formatOptions = formatOptions;
     }
 
     @Override
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return factory.createDecodingFormat(null, formatOptions); // context 
is useless
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> 
filters) {
+        // avro is a file format that keeps schemas in file headers,
+        // if the schema given to the reader is not equal to the schema in 
header,
+        // reader will automatically map the fields and give back records with 
our desired
+        // schema
+        //
+        // for detailed discussion see comments in 
https://github.com/apache/flink/pull/18657
+        LogicalType producedType = Projection.of(projection).project(type);
+        return new AvroGenericRecordBulkFormat(
+                (RowType) producedType.copy(false), 
InternalTypeInfo.of(producedType));
     }
 
     @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return factory.createEncodingFormat(null, formatOptions); // context 
is useless
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {

Review Comment:
   Add comments stating that this implementation is copied from Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to