antonireus commented on code in PR #15159:
URL: https://github.com/apache/iceberg/pull/15159#discussion_r2929800999
##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -423,14 +423,45 @@ public static Type find(Type type, Predicate<Type>
predicate) {
return visit(type, new FindTypeVisitor(predicate));
}
+ /**
+ * @deprecated will be removed in 2.0.0, use {@link
#isPromotionAllowed(Type, Type.PrimitiveType,
+ * Integer, boolean)} instead. This method does not take advantage of
table format or source
+ * id references
+ */
+ @Deprecated
public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) {
+ return TypeUtil.isPromotionAllowed(from, to, 2, false);
+ }
+
+ private static boolean handleDateType(
+ Type.PrimitiveType to, Integer formatVersion, boolean sourceIdReference)
{
+ if (formatVersion < 3) {
+ return false;
+ } else if (sourceIdReference) {
+ return false;
+ } else if (to.typeId() == Type.TypeID.TIMESTAMP) {
+ // Timezone types cannot be promoted.
+ Types.TimestampType toTs = (Types.TimestampType) to;
+ return Types.TimestampType.withoutZone().equals(toTs);
+ } else if (to.typeId() == Type.TypeID.TIMESTAMP_NANO) {
+ // Timezone types cannot be promoted.
+ Types.TimestampNanoType toTs = (Types.TimestampNanoType) to;
+ return Types.TimestampNanoType.withoutZone().equals(toTs);
Review Comment:
same
```suggestion
return !toTs.shouldAdjustToUTC();
```
##########
core/src/main/java/org/apache/iceberg/SchemaUpdate.java:
##########
@@ -66,27 +66,48 @@ class SchemaUpdate implements UpdateSchema {
private boolean allowIncompatibleChanges = false;
private Set<String> identifierFieldNames;
private boolean caseSensitive = true;
+ private final int formatVersion;
SchemaUpdate(TableOperations ops) {
this(ops, ops.current());
}
+ /** For testing only. */
+ SchemaUpdate(TableMetadata base) {
+ this(null, base, base.schema(), base.lastColumnId(), base.formatVersion());
+ }
+
/** For testing only. */
SchemaUpdate(Schema schema, int lastColumnId) {
- this(null, null, schema, lastColumnId);
+ this(null, null, schema, lastColumnId,
TableProperties.DEFAULT_FORMAT_VERSION);
Review Comment:
`TableMetadata.DEFAULT_TABLE_FORMAT_VERSION`
##########
core/src/main/java/org/apache/iceberg/SchemaUpdate.java:
##########
@@ -66,27 +66,48 @@ class SchemaUpdate implements UpdateSchema {
private boolean allowIncompatibleChanges = false;
private Set<String> identifierFieldNames;
private boolean caseSensitive = true;
+ private final int formatVersion;
SchemaUpdate(TableOperations ops) {
this(ops, ops.current());
}
+ /** For testing only. */
+ SchemaUpdate(TableMetadata base) {
+ this(null, base, base.schema(), base.lastColumnId(), base.formatVersion());
+ }
+
/** For testing only. */
SchemaUpdate(Schema schema, int lastColumnId) {
- this(null, null, schema, lastColumnId);
+ this(null, null, schema, lastColumnId,
TableProperties.DEFAULT_FORMAT_VERSION);
}
- private SchemaUpdate(TableOperations ops, TableMetadata base) {
- this(ops, base, base.schema(), base.lastColumnId());
+ /** For testing only. */
+ SchemaUpdate(Schema schema, int lastColumnId, int formatVersion) {
+ this(null, null, schema, lastColumnId, formatVersion);
}
- private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema,
int lastColumnId) {
+ private SchemaUpdate(TableOperations ops, TableMetadata base) {
+ this(
+ ops,
+ base,
+ base.schema(),
+ base.lastColumnId(),
+ PropertyUtil.propertyAsInt(
+ base.properties(),
+ TableProperties.FORMAT_VERSION,
+ TableProperties.DEFAULT_FORMAT_VERSION));
Review Comment:
could just use `base.formatVersion()`
##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -423,14 +423,45 @@ public static Type find(Type type, Predicate<Type>
predicate) {
return visit(type, new FindTypeVisitor(predicate));
}
+ /**
+ * @deprecated will be removed in 2.0.0, use {@link
#isPromotionAllowed(Type, Type.PrimitiveType,
+ * Integer, boolean)} instead. This method does not take advantage of
table format or source
+ * id references
+ */
+ @Deprecated
public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) {
+ return TypeUtil.isPromotionAllowed(from, to, 2, false);
+ }
+
+ private static boolean handleDateType(
+ Type.PrimitiveType to, Integer formatVersion, boolean sourceIdReference)
{
+ if (formatVersion < 3) {
+ return false;
+ } else if (sourceIdReference) {
+ return false;
+ } else if (to.typeId() == Type.TypeID.TIMESTAMP) {
+ // Timezone types cannot be promoted.
+ Types.TimestampType toTs = (Types.TimestampType) to;
+ return Types.TimestampType.withoutZone().equals(toTs);
+ } else if (to.typeId() == Type.TypeID.TIMESTAMP_NANO) {
+ // Timezone types cannot be promoted.
+ Types.TimestampNanoType toTs = (Types.TimestampNanoType) to;
+ return Types.TimestampNanoType.withoutZone().equals(toTs);
+ }
+ return false;
+ }
+
+ public static boolean isPromotionAllowed(
+ Type from, Type.PrimitiveType to, Integer formatVersion, boolean
sourceIdReference) {
Review Comment:
Unnecessary boxing of `formatVersion` could be just `int`
##########
core/src/main/java/org/apache/iceberg/SchemaUpdate.java:
##########
@@ -281,8 +302,17 @@ public UpdateSchema updateColumn(String name,
Type.PrimitiveType newType) {
return this;
}
+ // If field is listed in source-ids, we need to flag it for promoting date
-> timestamp.
+ List<PartitionField> partitionFields =
+ this.base != null
+ ? this.base.spec().getFieldsBySourceId(field.fieldId())
+ : Lists.newArrayList();
+
+ boolean isBucketPartitioned =
+ partitionFields.stream().anyMatch(pf ->
pf.transform().toString().startsWith("bucket["));
Review Comment:
reading the spec seems that not only bucked would produce different values,
but probably also the identity itself, as the value for a date is the number of
days since epoch, and for timestamp is the number of micros ore nanos since
epoch. OTOH, I guess that truncate would be the same.
I would rename the boolean to something like
`hasIncompatiblePartitionTransform`, and match for both identity and transform.
The bucket check seems a bit brittle, why not check if the transform is
instance of `Bucket`?
##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java:
##########
@@ -82,6 +86,24 @@ static Function<Object, Object> converterFromParquet(
} else if (icebergType.typeId() == Type.TypeID.DOUBLE
&& parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.FLOAT) {
return value -> ((Float) fromParquet.apply(value)).doubleValue();
+ } else if (icebergType.typeId() == Type.TypeID.TIMESTAMP
+ && parquetType.getOriginalType() ==
org.apache.parquet.schema.OriginalType.DATE) {
+ LogicalTypeAnnotation logicalType =
parquetType.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
+ && ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
logicalType)
+ .isAdjustedToUTC()) {
+ return fromParquet;
+ }
+ return value -> (long) ((Integer) fromParquet.apply(value)) *
TimeUnit.DAYS.toMicros(1);
+ } else if (icebergType.typeId() == Type.TypeID.TIMESTAMP_NANO
+ && parquetType.getOriginalType() ==
org.apache.parquet.schema.OriginalType.DATE) {
+ LogicalTypeAnnotation logicalType =
parquetType.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
+ && ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
logicalType)
+ .isAdjustedToUTC()) {
+ return fromParquet;
+ }
Review Comment:
same unneeded check
##########
core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java:
##########
@@ -53,7 +56,17 @@ private UnionByNameVisitor(UpdateSchema api, Schema
partnerSchema, boolean caseS
* @param newSchema a new schema to compare with the existing
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema
newSchema) {
Review Comment:
this method doesn't seem to be used, seems that the only `visit` method
would be the one comming from `SchemaUpdate` so maybe the defaults are not
needed here at all
##########
core/src/test/java/org/apache/iceberg/TestSchemaUnionByFieldName.java:
##########
@@ -413,6 +413,28 @@ public void
testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
assertThat(applied.asStruct()).isEqualTo(newSchema.asStruct());
}
+ @Test
+ // date -> Can promote to timestamp
+ public void testTypePromoteDateToTimestamp() {
+ Schema currentSchema = new Schema(required(1, "aCol", DateType.get()));
+ Schema newSchema = new Schema(required(1, "aCol",
TimestampType.withoutZone()));
+
+ Schema applied = new SchemaUpdate(currentSchema,
1).unionByNameWith(newSchema).apply();
Review Comment:
shouldn't this test fail if it uses the default formatVersion 2?
##########
core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java:
##########
@@ -53,7 +56,17 @@ private UnionByNameVisitor(UpdateSchema api, Schema
partnerSchema, boolean caseS
* @param newSchema a new schema to compare with the existing
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema
newSchema) {
- visit(api, existingSchema, newSchema, true);
+ visit(api, existingSchema, newSchema, true, 2);
Review Comment:
should use `TableMetadata.DEFAULT_TABLE_FORMAT_VERSION` if needed
##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -41,6 +41,8 @@ private TableProperties() {}
*/
public static final String FORMAT_VERSION = "format-version";
+ public static final int DEFAULT_FORMAT_VERSION = 2;
Review Comment:
This constant already exists in `TableMetadata.DEFAULT_TABLE_FORMAT_VERSION`
but is package-private. I guess would be better to make that one `public`
##########
core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java:
##########
@@ -53,7 +56,17 @@ private UnionByNameVisitor(UpdateSchema api, Schema
partnerSchema, boolean caseS
* @param newSchema a new schema to compare with the existing
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema
newSchema) {
- visit(api, existingSchema, newSchema, true);
+ visit(api, existingSchema, newSchema, true, 2);
+ }
+
+ public static void visit(
+ UpdateSchema api, Schema existingSchema, Schema newSchema, boolean
caseSensitive) {
+ visit(api, existingSchema, newSchema, caseSensitive, 2);
Review Comment:
should use `TableMetadata.DEFAULT_TABLE_FORMAT_VERSION`
##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java:
##########
@@ -23,10 +23,13 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
+import java.util.UUID;
Review Comment:
unused import
##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -423,14 +423,45 @@ public static Type find(Type type, Predicate<Type>
predicate) {
return visit(type, new FindTypeVisitor(predicate));
}
+ /**
+ * @deprecated will be removed in 2.0.0, use {@link
#isPromotionAllowed(Type, Type.PrimitiveType,
+ * Integer, boolean)} instead. This method does not take advantage of
table format or source
+ * id references
+ */
+ @Deprecated
public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) {
+ return TypeUtil.isPromotionAllowed(from, to, 2, false);
+ }
+
+ private static boolean handleDateType(
+ Type.PrimitiveType to, Integer formatVersion, boolean sourceIdReference)
{
+ if (formatVersion < 3) {
+ return false;
+ } else if (sourceIdReference) {
+ return false;
+ } else if (to.typeId() == Type.TypeID.TIMESTAMP) {
+ // Timezone types cannot be promoted.
+ Types.TimestampType toTs = (Types.TimestampType) to;
+ return Types.TimestampType.withoutZone().equals(toTs);
Review Comment:
would be easier to just check if it's adjusted to utc
```suggestion
return !toTs.shouldAdjustToUTC();
```
##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java:
##########
@@ -82,6 +86,24 @@ static Function<Object, Object> converterFromParquet(
} else if (icebergType.typeId() == Type.TypeID.DOUBLE
&& parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.FLOAT) {
return value -> ((Float) fromParquet.apply(value)).doubleValue();
+ } else if (icebergType.typeId() == Type.TypeID.TIMESTAMP
+ && parquetType.getOriginalType() ==
org.apache.parquet.schema.OriginalType.DATE) {
+ LogicalTypeAnnotation logicalType =
parquetType.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
+ && ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
logicalType)
+ .isAdjustedToUTC()) {
+ return fromParquet;
+ }
Review Comment:
this check seems not needed. Looking into `parquetType.getOriginalType()` it
relies in the logicalTypeAnnotation. So if it's a DATE it will never happen
that the logical type it's a timestamp.
I would say that this logical type check it's not needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]