rdblue commented on a change in pull request #1237:
URL: https://github.com/apache/iceberg/pull/1237#discussion_r460562343
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -19,38 +19,439 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.util.Iterator;
import java.util.List;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.BaseParquetWriter;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
-public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+public class FlinkParquetWriters {
+ private FlinkParquetWriters() {
+ }
- private static final FlinkParquetWriters INSTANCE = new
FlinkParquetWriters();
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
+ return (ParquetValueWriter<T>) ParquetWithFlinkSchemaVisitor.visit(schema,
type, new WriteBuilder(type));
+ }
- private FlinkParquetWriters() {
+ private static class WriteBuilder extends
ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
+ private final MessageType type;
+
+ WriteBuilder(MessageType type) {
+ this.type = type;
+ }
+
+ @Override
+ public ParquetValueWriter<?> message(RowType sStruct, MessageType message,
List<ParquetValueWriter<?>> fields) {
+ return struct(sStruct, message.asGroupType(), fields);
+ }
+
+ @Override
+ public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+ List<ParquetValueWriter<?>>
fieldWriters) {
+ List<Type> fields = struct.getFields();
+ List<RowField> flinkFields = sStruct.getFields();
+ List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
+ List<LogicalType> flinkTypes = Lists.newArrayList();
+ for (int i = 0; i < fields.size(); i += 1) {
+ writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
+ flinkTypes.add(flinkFields.get(i).getType());
+ }
+
+ return new RowDataWriter(writers, flinkTypes);
+ }
+
+ @Override
+ public ParquetValueWriter<?> list(ArrayType sArray, GroupType array,
ParquetValueWriter<?> elementWriter) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new ArrayDataWriter<>(repeatedD, repeatedR,
+ newOption(repeated.getType(0), elementWriter),
+ sArray.getElementType());
+ }
+
+ @Override
+ public ParquetValueWriter<?> map(MapType sMap, GroupType map,
+ ParquetValueWriter<?> keyWriter,
ParquetValueWriter<?> valueWriter) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new MapDataWriter<>(repeatedD, repeatedR,
+ newOption(repeatedKeyValue.getType(0), keyWriter),
+ newOption(repeatedKeyValue.getType(1), valueWriter),
+ sMap.getKeyType(), sMap.getValueType());
+ }
+
+
+ private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type
fieldType, ParquetValueWriter<?> writer) {
+ int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
+ return ParquetValueWriters.option(fieldType, maxD, writer);
+ }
+
+ @Override
+ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType
primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return strings(desc);
+ case DATE:
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ case TIME_MICROS:
Review comment:
Like the read path, time needs to be handled separately to convert from
millis to micros.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]