openinx commented on a change in pull request #1272:
URL: https://github.com/apache/iceberg/pull/1272#discussion_r463045086
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,723 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ ReadBuilder builder = new ReadBuilder(fileSchema, idToConstant);
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
builder);
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(builder));
+ }
+ }
+
+ private static class FallbackReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private MessageType type;
+ private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
+
+ FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ this.type = message;
+ return builder.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the
Review comment:
nit: the comment is not complete ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,723 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ ReadBuilder builder = new ReadBuilder(fileSchema, idToConstant);
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
builder);
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(builder));
+ }
+ }
+
+ private static class FallbackReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private MessageType type;
+ private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
+
+ FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ this.type = message;
+ return builder.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the
+ List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
+ fieldReaders.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ types.add(fieldType);
+ }
+
+ return new RowDataReader(types, newFields);
+ }
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
+
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ this.type = type;
+ this.idToConstant = idToConstant;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // match the expected struct's order
+ Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+ Map<Integer, Type> typesById = Maps.newHashMap();
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ if (fieldType.getId() != null) {
+ int id = fieldType.getId().intValue();
+ readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ typesById.put(id, fieldType);
+ }
+ }
+
+ List<Types.NestedField> expectedFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+ List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(
+ expectedFields.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ for (Types.NestedField field : expectedFields) {
+ int id = field.fieldId();
+ if (idToConstant.containsKey(id)) {
+ // containsKey is used because the constant may be null
+
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ types.add(null);
+ } else {
+ ParquetValueReader<?> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
+ }
+ }
+
+ return new RowDataReader(types, reorderedFields);
+ }
+
+ @Override
+ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType
array,
+ ParquetValueReader<?> elementReader) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type elementType = repeated.getType(0);
+ int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) -
1;
+
+ return new ArrayReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
+ }
+
+ @Override
+ public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+ ParquetValueReader<?> keyReader,
+ ParquetValueReader<?> valueReader) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type keyType = repeatedKeyValue.getType(0);
+ int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+ Type valueType = repeatedKeyValue.getType(1);
+ int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+ return new MapReader<>(repeatedD, repeatedR,
+ ParquetValueReaders.option(keyType, keyD, keyReader),
+ ParquetValueReaders.option(valueType, valueD, valueReader));
+ }
+
+ @Override
+ public ParquetValueReader<?>
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return new StringReader(desc);
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ case DATE:
+ if (expected != null && expected.typeId() ==
Types.LongType.get().typeId()) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case TIME_MICROS:
+ return new TimeMillisReader(desc);
Review comment:
Q: is there any problem here ? the original type is `TIME_MICROS`,
while the reader name is `TimeMillisReader` ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,723 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ ReadBuilder builder = new ReadBuilder(fileSchema, idToConstant);
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
builder);
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(builder));
+ }
+ }
+
+ private static class FallbackReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private MessageType type;
+ private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
+
+ FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ this.type = message;
+ return builder.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the
+ List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
+ fieldReaders.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ types.add(fieldType);
+ }
+
+ return new RowDataReader(types, newFields);
+ }
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
+
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ this.type = type;
+ this.idToConstant = idToConstant;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // match the expected struct's order
+ Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+ Map<Integer, Type> typesById = Maps.newHashMap();
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ if (fieldType.getId() != null) {
+ int id = fieldType.getId().intValue();
+ readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ typesById.put(id, fieldType);
+ }
+ }
+
+ List<Types.NestedField> expectedFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+ List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(
+ expectedFields.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ for (Types.NestedField field : expectedFields) {
+ int id = field.fieldId();
+ if (idToConstant.containsKey(id)) {
+ // containsKey is used because the constant may be null
+
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ types.add(null);
+ } else {
+ ParquetValueReader<?> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
+ }
+ }
+
+ return new RowDataReader(types, reorderedFields);
+ }
+
+ @Override
+ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType
array,
+ ParquetValueReader<?> elementReader) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type elementType = repeated.getType(0);
+ int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) -
1;
+
+ return new ArrayReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
+ }
+
+ @Override
+ public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+ ParquetValueReader<?> keyReader,
+ ParquetValueReader<?> valueReader) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type keyType = repeatedKeyValue.getType(0);
+ int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+ Type valueType = repeatedKeyValue.getType(1);
+ int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+ return new MapReader<>(repeatedD, repeatedR,
+ ParquetValueReaders.option(keyType, keyD, keyReader),
+ ParquetValueReaders.option(valueType, valueD, valueReader));
+ }
+
+ @Override
+ public ParquetValueReader<?>
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return new StringReader(desc);
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ case DATE:
+ if (expected != null && expected.typeId() ==
Types.LongType.get().typeId()) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case TIME_MICROS:
+ return new TimeMillisReader(desc);
+ case INT_64:
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ case TIMESTAMP_MICROS:
+ return new TimestampMicroReader(desc);
+ case DECIMAL:
+ DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new BinaryDecimalReader(desc, decimal.getScale());
+ case INT64:
+ return new LongDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
+ case INT32:
+ return new IntegerDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ case BSON:
+ return new ParquetValueReaders.ByteArrayReader(desc);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ }
+
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return new ParquetValueReaders.ByteArrayReader(desc);
+ case INT32:
+ if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.LONG) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case FLOAT:
+ if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+ return new ParquetValueReaders.FloatAsDoubleReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case BOOLEAN:
+ case INT64:
+ case DOUBLE:
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+
+ protected MessageType type() {
Review comment:
Will any subclass of `ReadBuilder` access the message type ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -19,38 +19,457 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.util.Iterator;
import java.util.List;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.BaseParquetWriter;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
-public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+public class FlinkParquetWriters {
+ private FlinkParquetWriters() {
+ }
- private static final FlinkParquetWriters INSTANCE = new
FlinkParquetWriters();
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
+ return (ParquetValueWriter<T>) ParquetWithFlinkSchemaVisitor.visit(schema,
type, new WriteBuilder(type));
+ }
- private FlinkParquetWriters() {
+ private static class WriteBuilder extends
ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
+ private final MessageType type;
+
+ WriteBuilder(MessageType type) {
+ this.type = type;
+ }
+
+ @Override
+ public ParquetValueWriter<?> message(RowType sStruct, MessageType message,
List<ParquetValueWriter<?>> fields) {
+ return struct(sStruct, message.asGroupType(), fields);
+ }
+
+ @Override
+ public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+ List<ParquetValueWriter<?>>
fieldWriters) {
+ List<Type> fields = struct.getFields();
+ List<RowField> flinkFields = sStruct.getFields();
+ List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
+ List<LogicalType> flinkTypes = Lists.newArrayList();
+ for (int i = 0; i < fields.size(); i += 1) {
+ writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
+ flinkTypes.add(flinkFields.get(i).getType());
+ }
+
+ return new RowDataWriter(writers, flinkTypes);
+ }
+
+ @Override
+ public ParquetValueWriter<?> list(ArrayType sArray, GroupType array,
ParquetValueWriter<?> elementWriter) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new ArrayDataWriter<>(repeatedD, repeatedR,
+ newOption(repeated.getType(0), elementWriter),
+ sArray.getElementType());
+ }
+
+ @Override
+ public ParquetValueWriter<?> map(MapType sMap, GroupType map,
+ ParquetValueWriter<?> keyWriter,
ParquetValueWriter<?> valueWriter) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new MapDataWriter<>(repeatedD, repeatedR,
+ newOption(repeatedKeyValue.getType(0), keyWriter),
+ newOption(repeatedKeyValue.getType(1), valueWriter),
+ sMap.getKeyType(), sMap.getValueType());
+ }
+
+
+ private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type
fieldType, ParquetValueWriter<?> writer) {
+ int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
+ return ParquetValueWriters.option(fieldType, maxD, writer);
+ }
+
+ @Override
+ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType
primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return strings(desc);
+ case DATE:
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return ints(sType, desc);
+ case INT_64:
+ return ParquetValueWriters.longs(desc);
+ case TIME_MICROS:
+ return timeMicros(desc);
+ case TIMESTAMP_MICROS:
+ return timestamps(desc);
+ case DECIMAL:
+ DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+ switch (primitive.getPrimitiveTypeName()) {
+ case INT32:
+ return decimalAsInteger(desc, decimal.getPrecision(),
decimal.getScale());
+ case INT64:
+ return decimalAsLong(desc, decimal.getPrecision(),
decimal.getScale());
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return decimalAsFixed(desc, decimal.getPrecision(),
decimal.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ case BSON:
+ return byteArrays(desc);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ }
+
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return byteArrays(desc);
+ case BOOLEAN:
+ return ParquetValueWriters.booleans(desc);
+ case INT32:
+ return ints(sType, desc);
+ case INT64:
+ return ParquetValueWriters.longs(desc);
+ case FLOAT:
+ return ParquetValueWriters.floats(desc);
+ case DOUBLE:
+ return ParquetValueWriters.doubles(desc);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<?> ints(LogicalType type,
ColumnDescriptor desc) {
+ if (type instanceof TinyIntType) {
+ return ParquetValueWriters.tinyints(desc);
+ } else if (type instanceof SmallIntType) {
+ return ParquetValueWriters.shorts(desc);
+ }
+ return ParquetValueWriters.ints(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<StringData>
strings(ColumnDescriptor desc) {
+ return new StringDataWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<Integer>
timeMicros(ColumnDescriptor desc) {
+ return new TimeMicrosWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsInteger(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new IntegerDecimalWriter(desc, precision, scale);
+ }
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsLong(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new LongDecimalWriter(desc, precision, scale);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsFixed(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new FixedDecimalWriter(desc, precision, scale);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<TimestampData>
timestamps(ColumnDescriptor desc) {
+ return new TimestampDataWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<byte[]>
byteArrays(ColumnDescriptor desc) {
+ return new ByteArrayWriter(desc);
+ }
+
+ private static class StringDataWriter extends
ParquetValueWriters.PrimitiveWriter<StringData> {
+ private StringDataWriter(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public void write(int repetitionLevel, StringData value) {
+ column.writeBinary(repetitionLevel,
Binary.fromReusedByteArray(value.toBytes()));
+ }
+ }
+
+ private static class TimeMicrosWriter extends
ParquetValueWriters.PrimitiveWriter<Integer> {
+ private TimeMicrosWriter(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public void write(int repetitionLevel, Integer value) {
+ long micros = Long.valueOf(value) * 1000;
+ column.writeLong(repetitionLevel, micros);
+ }
+ }
+
+ private static class IntegerDecimalWriter extends
ParquetValueWriters.PrimitiveWriter<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ private IntegerDecimalWriter(ColumnDescriptor desc, int precision, int
scale) {
+ super(desc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void write(int repetitionLevel, DecimalData decimal) {
+ Preconditions.checkArgument(decimal.scale() == scale,
+ "Cannot write value as decimal(%s,%s), wrong scale: %s", precision,
scale, decimal);
+ Preconditions.checkArgument(decimal.precision() <= precision,
Review comment:
Seem the upper bound of precision of IntegerDecimalWriter is 9 ? Could
we add the `precision <= 9` assertion ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -19,38 +19,457 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.util.Iterator;
import java.util.List;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.BaseParquetWriter;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
-public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+public class FlinkParquetWriters {
+ private FlinkParquetWriters() {
+ }
- private static final FlinkParquetWriters INSTANCE = new
FlinkParquetWriters();
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
+ return (ParquetValueWriter<T>) ParquetWithFlinkSchemaVisitor.visit(schema,
type, new WriteBuilder(type));
+ }
- private FlinkParquetWriters() {
+ private static class WriteBuilder extends
ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
+ private final MessageType type;
+
+ WriteBuilder(MessageType type) {
+ this.type = type;
+ }
+
+ @Override
+ public ParquetValueWriter<?> message(RowType sStruct, MessageType message,
List<ParquetValueWriter<?>> fields) {
+ return struct(sStruct, message.asGroupType(), fields);
+ }
+
+ @Override
+ public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+ List<ParquetValueWriter<?>>
fieldWriters) {
+ List<Type> fields = struct.getFields();
+ List<RowField> flinkFields = sStruct.getFields();
+ List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
+ List<LogicalType> flinkTypes = Lists.newArrayList();
+ for (int i = 0; i < fields.size(); i += 1) {
+ writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
+ flinkTypes.add(flinkFields.get(i).getType());
+ }
+
+ return new RowDataWriter(writers, flinkTypes);
+ }
+
+ @Override
+ public ParquetValueWriter<?> list(ArrayType sArray, GroupType array,
ParquetValueWriter<?> elementWriter) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new ArrayDataWriter<>(repeatedD, repeatedR,
+ newOption(repeated.getType(0), elementWriter),
+ sArray.getElementType());
+ }
+
+ @Override
+ public ParquetValueWriter<?> map(MapType sMap, GroupType map,
+ ParquetValueWriter<?> keyWriter,
ParquetValueWriter<?> valueWriter) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new MapDataWriter<>(repeatedD, repeatedR,
+ newOption(repeatedKeyValue.getType(0), keyWriter),
+ newOption(repeatedKeyValue.getType(1), valueWriter),
+ sMap.getKeyType(), sMap.getValueType());
+ }
+
+
+ private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type
fieldType, ParquetValueWriter<?> writer) {
+ int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
+ return ParquetValueWriters.option(fieldType, maxD, writer);
+ }
+
+ @Override
+ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType
primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return strings(desc);
+ case DATE:
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return ints(sType, desc);
+ case INT_64:
+ return ParquetValueWriters.longs(desc);
+ case TIME_MICROS:
+ return timeMicros(desc);
+ case TIMESTAMP_MICROS:
+ return timestamps(desc);
+ case DECIMAL:
+ DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+ switch (primitive.getPrimitiveTypeName()) {
+ case INT32:
+ return decimalAsInteger(desc, decimal.getPrecision(),
decimal.getScale());
+ case INT64:
+ return decimalAsLong(desc, decimal.getPrecision(),
decimal.getScale());
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return decimalAsFixed(desc, decimal.getPrecision(),
decimal.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ case BSON:
+ return byteArrays(desc);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ }
+
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return byteArrays(desc);
+ case BOOLEAN:
+ return ParquetValueWriters.booleans(desc);
+ case INT32:
+ return ints(sType, desc);
+ case INT64:
+ return ParquetValueWriters.longs(desc);
+ case FLOAT:
+ return ParquetValueWriters.floats(desc);
+ case DOUBLE:
+ return ParquetValueWriters.doubles(desc);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<?> ints(LogicalType type,
ColumnDescriptor desc) {
+ if (type instanceof TinyIntType) {
+ return ParquetValueWriters.tinyints(desc);
+ } else if (type instanceof SmallIntType) {
+ return ParquetValueWriters.shorts(desc);
+ }
+ return ParquetValueWriters.ints(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<StringData>
strings(ColumnDescriptor desc) {
+ return new StringDataWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<Integer>
timeMicros(ColumnDescriptor desc) {
+ return new TimeMicrosWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsInteger(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new IntegerDecimalWriter(desc, precision, scale);
+ }
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsLong(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new LongDecimalWriter(desc, precision, scale);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsFixed(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new FixedDecimalWriter(desc, precision, scale);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<TimestampData>
timestamps(ColumnDescriptor desc) {
+ return new TimestampDataWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<byte[]>
byteArrays(ColumnDescriptor desc) {
+ return new ByteArrayWriter(desc);
+ }
+
+ private static class StringDataWriter extends
ParquetValueWriters.PrimitiveWriter<StringData> {
+ private StringDataWriter(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public void write(int repetitionLevel, StringData value) {
+ column.writeBinary(repetitionLevel,
Binary.fromReusedByteArray(value.toBytes()));
+ }
+ }
+
+ private static class TimeMicrosWriter extends
ParquetValueWriters.PrimitiveWriter<Integer> {
+ private TimeMicrosWriter(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public void write(int repetitionLevel, Integer value) {
+ long micros = Long.valueOf(value) * 1000;
+ column.writeLong(repetitionLevel, micros);
+ }
+ }
+
+ private static class IntegerDecimalWriter extends
ParquetValueWriters.PrimitiveWriter<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ private IntegerDecimalWriter(ColumnDescriptor desc, int precision, int
scale) {
+ super(desc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void write(int repetitionLevel, DecimalData decimal) {
+ Preconditions.checkArgument(decimal.scale() == scale,
+ "Cannot write value as decimal(%s,%s), wrong scale: %s", precision,
scale, decimal);
+ Preconditions.checkArgument(decimal.precision() <= precision,
+ "Cannot write value as decimal(%s,%s), too large: %s", precision,
scale, decimal);
+
+ column.writeInteger(repetitionLevel, (int) decimal.toUnscaledLong());
+ }
+ }
+
+ private static class LongDecimalWriter extends
ParquetValueWriters.PrimitiveWriter<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ private LongDecimalWriter(ColumnDescriptor desc, int precision, int scale)
{
+ super(desc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void write(int repetitionLevel, DecimalData decimal) {
+ Preconditions.checkArgument(decimal.scale() == scale,
+ "Cannot write value as decimal(%s,%s), wrong scale: %s", precision,
scale, decimal);
+ Preconditions.checkArgument(decimal.precision() <= precision,
Review comment:
Also could we add the `precision <= 18` assertion ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,723 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
Review comment:
This class seems don't have to be `public`, only the FlinkParquetReader
will access those readers. It also don't need to be accessed by other classes
I think.
##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,20 +105,187 @@ public Row next() {
};
}
+ private static Iterable<Record> generateIcebergGenerics(Schema schema, int
numRecords,
Review comment:
Seems it could share the common code with RandomGenericData#generate ?
Make the `RandomGenericData#generate` to return a Iterable<Record> ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -19,38 +19,457 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.util.Iterator;
import java.util.List;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.BaseParquetWriter;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
-public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+public class FlinkParquetWriters {
+ private FlinkParquetWriters() {
+ }
- private static final FlinkParquetWriters INSTANCE = new
FlinkParquetWriters();
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
+ return (ParquetValueWriter<T>) ParquetWithFlinkSchemaVisitor.visit(schema,
type, new WriteBuilder(type));
+ }
- private FlinkParquetWriters() {
+ private static class WriteBuilder extends
ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
+ private final MessageType type;
+
+ WriteBuilder(MessageType type) {
+ this.type = type;
+ }
+
+ @Override
+ public ParquetValueWriter<?> message(RowType sStruct, MessageType message,
List<ParquetValueWriter<?>> fields) {
+ return struct(sStruct, message.asGroupType(), fields);
+ }
+
+ @Override
+ public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+ List<ParquetValueWriter<?>>
fieldWriters) {
+ List<Type> fields = struct.getFields();
+ List<RowField> flinkFields = sStruct.getFields();
+ List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
+ List<LogicalType> flinkTypes = Lists.newArrayList();
+ for (int i = 0; i < fields.size(); i += 1) {
+ writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
+ flinkTypes.add(flinkFields.get(i).getType());
+ }
+
+ return new RowDataWriter(writers, flinkTypes);
+ }
+
+ @Override
+ public ParquetValueWriter<?> list(ArrayType sArray, GroupType array,
ParquetValueWriter<?> elementWriter) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new ArrayDataWriter<>(repeatedD, repeatedR,
+ newOption(repeated.getType(0), elementWriter),
+ sArray.getElementType());
+ }
+
+ @Override
+ public ParquetValueWriter<?> map(MapType sMap, GroupType map,
+ ParquetValueWriter<?> keyWriter,
ParquetValueWriter<?> valueWriter) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+ return new MapDataWriter<>(repeatedD, repeatedR,
+ newOption(repeatedKeyValue.getType(0), keyWriter),
+ newOption(repeatedKeyValue.getType(1), valueWriter),
+ sMap.getKeyType(), sMap.getValueType());
+ }
+
+
+ private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type
fieldType, ParquetValueWriter<?> writer) {
+ int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
+ return ParquetValueWriters.option(fieldType, maxD, writer);
+ }
+
+ @Override
+ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType
primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return strings(desc);
+ case DATE:
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return ints(sType, desc);
+ case INT_64:
+ return ParquetValueWriters.longs(desc);
+ case TIME_MICROS:
+ return timeMicros(desc);
+ case TIMESTAMP_MICROS:
+ return timestamps(desc);
+ case DECIMAL:
+ DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+ switch (primitive.getPrimitiveTypeName()) {
+ case INT32:
+ return decimalAsInteger(desc, decimal.getPrecision(),
decimal.getScale());
+ case INT64:
+ return decimalAsLong(desc, decimal.getPrecision(),
decimal.getScale());
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return decimalAsFixed(desc, decimal.getPrecision(),
decimal.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ case BSON:
+ return byteArrays(desc);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ }
+
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return byteArrays(desc);
+ case BOOLEAN:
+ return ParquetValueWriters.booleans(desc);
+ case INT32:
+ return ints(sType, desc);
+ case INT64:
+ return ParquetValueWriters.longs(desc);
+ case FLOAT:
+ return ParquetValueWriters.floats(desc);
+ case DOUBLE:
+ return ParquetValueWriters.doubles(desc);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<?> ints(LogicalType type,
ColumnDescriptor desc) {
+ if (type instanceof TinyIntType) {
+ return ParquetValueWriters.tinyints(desc);
+ } else if (type instanceof SmallIntType) {
+ return ParquetValueWriters.shorts(desc);
+ }
+ return ParquetValueWriters.ints(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<StringData>
strings(ColumnDescriptor desc) {
+ return new StringDataWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<Integer>
timeMicros(ColumnDescriptor desc) {
+ return new TimeMicrosWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsInteger(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new IntegerDecimalWriter(desc, precision, scale);
+ }
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsLong(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new LongDecimalWriter(desc, precision, scale);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsFixed(ColumnDescriptor desc,
+
int precision, int scale) {
+ return new FixedDecimalWriter(desc, precision, scale);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<TimestampData>
timestamps(ColumnDescriptor desc) {
+ return new TimestampDataWriter(desc);
+ }
+
+ private static ParquetValueWriters.PrimitiveWriter<byte[]>
byteArrays(ColumnDescriptor desc) {
+ return new ByteArrayWriter(desc);
+ }
+
+ private static class StringDataWriter extends
ParquetValueWriters.PrimitiveWriter<StringData> {
+ private StringDataWriter(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public void write(int repetitionLevel, StringData value) {
+ column.writeBinary(repetitionLevel,
Binary.fromReusedByteArray(value.toBytes()));
+ }
+ }
+
+ private static class TimeMicrosWriter extends
ParquetValueWriters.PrimitiveWriter<Integer> {
Review comment:
The reader is named `TimeMillisReader`, and the writer is
`TimeMicrosWriter`, could them be symmetrical ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]