rdblue commented on a change in pull request #1272:
URL: https://github.com/apache/iceberg/pull/1272#discussion_r475998070
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -19,38 +19,436 @@
package org.apache.iceberg.flink.data;
+import java.util.Iterator;
import java.util.List;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.BaseParquetWriter;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DecimalUtil;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
-public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+public class FlinkParquetWriters {
+ private FlinkParquetWriters() {
+ }
- private static final FlinkParquetWriters INSTANCE = new
FlinkParquetWriters();
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
+ return (ParquetValueWriter<T>) ParquetWithFlinkSchemaVisitor.visit(schema,
type, new WriteBuilder(type));
+ }
- private FlinkParquetWriters() {
+ private static class WriteBuilder extends
ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
+ private final MessageType type;
+
+ WriteBuilder(MessageType type) {
+ this.type = type;
+ }
+
+ @Override
+ public ParquetValueWriter<?> message(RowType sStruct, MessageType message,
List<ParquetValueWriter<?>> fields) {
+ return struct(sStruct, message.asGroupType(), fields);
+ }
+
+ @Override
+ public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+ List<ParquetValueWriter<?>>
fieldWriters) {
+ List<Type> fields = struct.getFields();
+ List<RowField> flinkFields = sStruct.getFields();
+ List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
+ List<LogicalType> flinkTypes = Lists.newArrayList();
+ for (int i = 0; i < fields.size(); i += 1) {
+ writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
+ flinkTypes.add(flinkFields.get(i).getType());
+ }
+
+ return new RowDataWriter(writers, flinkTypes);
+ }
+
+ @Override
+ public ParquetValueWriter<?> list(ArrayType sArray, GroupType array,
ParquetValueWriter<?> elementWriter) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new ArrayDataWriter<>(repeatedD, repeatedR,
+ newOption(repeated.getType(0), elementWriter),
+ sArray.getElementType());
+ }
+
+ @Override
+ public ParquetValueWriter<?> map(MapType sMap, GroupType map,
+ ParquetValueWriter<?> keyWriter,
ParquetValueWriter<?> valueWriter) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new MapDataWriter<>(repeatedD, repeatedR,
+ newOption(repeatedKeyValue.getType(0), keyWriter),
+ newOption(repeatedKeyValue.getType(1), valueWriter),
+ sMap.getKeyType(), sMap.getValueType());
+ }
+
+
+ private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type
fieldType, ParquetValueWriter<?> writer) {
+ int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
+ return ParquetValueWriters.option(fieldType, maxD, writer);
+ }
+
+ @Override
+ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType
primitive) {
Review comment:
Nit: `s` in `sType` indicates Spark. The equivalent here would be
`fType` or a better name.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]