slinkydeveloper commented on a change in pull request #17520:
URL: https://github.com/apache/flink/pull/17520#discussion_r733354416
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
##########
@@ -20,39 +20,135 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.BulkReaderFormatFactory;
import org.apache.flink.table.factories.BulkWriterFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
/** Avro format factory for file system. */
@Internal
-public class AvroFileFormatFactory implements BulkWriterFormatFactory {
+public class AvroFileFormatFactory implements BulkReaderFormatFactory,
BulkWriterFormatFactory {
public static final String IDENTIFIER = "avro";
+ @Override
+ public BulkDecodingFormat<RowData> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions)
{
+ return new BulkDecodingFormat<RowData>() {
+ @Override
+ public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+ DynamicTableSource.Context sourceContext, DataType
producedDataType) {
+ RowType physicalRowype =
+ (RowType)
+ context.getCatalogTable()
+ .getResolvedSchema()
+ .toPhysicalRowDataType()
+ .getLogicalType();
Review comment:
> The resulting row must fit producedDataType.
I think so, if you wanna try out, just write a test with a table and a
select, and make sure in the select you don't pick all the fields of the input
table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]