openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r450714611
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
package org.apache.iceberg.flink;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {
Review comment:
@JingsongLi I'm curious that what's the difference between the flink
style `LogicalTypeVisitor` and iceberg style visitor... Currently, all of the
visitor are iceberg style, I'm not quite sure that what's the benifits to
convert it to flink style visitor ...
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
package org.apache.iceberg.flink;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {
Review comment:
BTW, seems this `FlinkTypeVisitor` can be package access (I forget to
check the access before).
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
}
@Override
- public Type fields(FieldsDataType fields, List<Type> types) {
- List<Types.NestedField> newFields =
Lists.newArrayListWithExpectedSize(types.size());
- boolean isRoot = root == fields;
+ public Type visit(CharType charType) {
+ return Types.StringType.get();
+ }
- List<RowType.RowField> rowFields = ((RowType)
fields.getLogicalType()).getFields();
- Preconditions.checkArgument(rowFields.size() == types.size(), "fields list
and types list should have same size.");
+ @Override
+ public Type visit(VarCharType varCharType) {
+ return Types.StringType.get();
+ }
- for (int i = 0; i < rowFields.size(); i++) {
- int id = isRoot ? i : getNextId();
+ @Override
+ public Type visit(BooleanType booleanType) {
+ return Types.BooleanType.get();
+ }
- RowType.RowField field = rowFields.get(i);
- String name = field.getName();
- String comment = field.getDescription().orElse(null);
+ @Override
+ public Type visit(BinaryType binaryType) {
+ return Types.FixedType.ofLength(binaryType.getLength());
+ }
- if (field.getType().isNullable()) {
- newFields.add(Types.NestedField.optional(id, name, types.get(i),
comment));
- } else {
- newFields.add(Types.NestedField.required(id, name, types.get(i),
comment));
- }
- }
+ @Override
+ public Type visit(VarBinaryType varBinaryType) {
+ return Types.BinaryType.get();
+ }
- return Types.StructType.of(newFields);
+ @Override
+ public Type visit(DecimalType decimalType) {
+ return Types.DecimalType.of(decimalType.getPrecision(),
decimalType.getScale());
+ }
+
+ @Override
+ public Type visit(TinyIntType tinyIntType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(SmallIntType smallIntType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(IntType intType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(BigIntType bigIntType) {
+ return Types.LongType.get();
+ }
+
+ @Override
+ public Type visit(FloatType floatType) {
+ return Types.FloatType.get();
+ }
+
+ @Override
+ public Type visit(DoubleType doubleType) {
+ return Types.DoubleType.get();
+ }
+
+ @Override
+ public Type visit(DateType dateType) {
+ return Types.DateType.get();
+ }
+
+ @Override
+ public Type visit(TimeType timeType) {
+ return Types.TimeType.get();
}
@Override
- public Type collection(CollectionDataType collection, Type elementType) {
- if (collection.getElementDataType().getLogicalType().isNullable()) {
+ public Type visit(TimestampType timestampType) {
+ return Types.TimestampType.withoutZone();
+ }
+
+ @Override
+ public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+ return Types.TimestampType.withZone();
+ }
+
+ @Override
+ public Type visit(ArrayType arrayType) {
+ Type elementType = arrayType.getElementType().accept(this);
+ if (arrayType.getElementType().isNullable()) {
return Types.ListType.ofOptional(getNextId(), elementType);
} else {
return Types.ListType.ofRequired(getNextId(), elementType);
}
}
@Override
- public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+ public Type visit(MultisetType multisetType) {
+ Type elementType = multisetType.getElementType().accept(this);
+ return Types.MapType.ofRequired(getNextId(), getNextId(), elementType,
Types.IntegerType.get());
Review comment:
Sounds good that we've extended support the flink `multiset` data type .
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]