aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812432890
##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema
schema) {
* @throws IllegalArgumentException if a field cannot be found (by name) in
the source schema
*/
public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
- Types.StructType struct = visit(schema, new
ReassignIds(idSourceSchema)).asStructType();
+ Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema,
null)).asStructType();
+ return new Schema(struct.fields(), refreshIdentifierFields(struct,
schema));
+ }
+
+ public static Schema reassignOrRefreshIds(Schema schema, Schema
idSourceSchema) {
+ AtomicInteger highest = new AtomicInteger(schema.highestFieldId());
Review comment:
Can there be a mismatch between the highest field ID in the schema and
`last-column-id` in the metadata? For instance, what if I add a column, then
drop it, and call this method with a new column? Can it cause the same column
ID be used twice?
##########
File path: api/src/main/java/org/apache/iceberg/Schema.java
##########
@@ -102,7 +103,7 @@ public Schema(int schemaId, List<NestedField> columns,
Map<String, Integer> alia
this.identifierFieldIds = identifierFieldIds != null ?
Ints.toArray(identifierFieldIds) : new int[0];
- lazyIdToName();
Review comment:
Did we remove this call on purpose? Seems unrelated.
##########
File path: api/src/main/java/org/apache/iceberg/types/ReassignIds.java
##########
@@ -43,6 +45,17 @@ public Type schema(Schema schema, Supplier<Type> future) {
}
}
+ private int id(Types.StructType sourceStruct, String name) {
+ if (sourceStruct != null) {
+ Types.NestedField sourceField = sourceStruct.field(name);
+ if (sourceField != null) {
+ return sourceField.fieldId();
+ }
+ }
+
+ return assignId.get();
Review comment:
I see that `assignId` is set to null in `reassignIds`. Can this result
in an NPE when called in `struct`?
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
Preconditions.checkArgument(handleTimestampWithoutZone ||
!SparkUtil.hasTimestampWithoutZone(table.schema()),
SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
- Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
- TypeUtil.validateWriteSchema(table.schema(), writeSchema,
writeConf.checkNullability(), writeConf.checkOrdering());
+ Schema writeSchema;
+ boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+ writeInfo.options().getBoolean("merge-schema", false));
+ if (mergeSchema) {
Review comment:
Will we always attempt to merge the schema if enabled? Even if the
incoming schema doesn't include new columns?
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
Preconditions.checkArgument(handleTimestampWithoutZone ||
!SparkUtil.hasTimestampWithoutZone(table.schema()),
SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
- Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
- TypeUtil.validateWriteSchema(table.schema(), writeSchema,
writeConf.checkNullability(), writeConf.checkOrdering());
+ Schema writeSchema;
+ boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+ writeInfo.options().getBoolean("merge-schema", false));
+ if (mergeSchema) {
+ // convert the dataset schema and assign fresh ids for new fields
+ Schema newSchema = SparkSchemaUtil.convertWithFreshIds(table.schema(),
dsSchema);
+
+ // update the table to get final id assignments and validate the changes
+ UpdateSchema update = table.updateSchema().unionByNameWith(newSchema);
+ Schema mergedSchema = update.apply();
+
+ // reconvert the dsSchema without assignment to use the ids assigned by
UpdateSchema
+ writeSchema = SparkSchemaUtil.convert(mergedSchema, dsSchema);
+
+ TypeUtil.validateWriteSchema(
+ mergedSchema, writeSchema, writeConf.checkNullability(),
writeConf.checkOrdering());
+
+ // if the validation passed, update the table schema
+ update.commit();
+ } else {
+ writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
Review comment:
nit: What about putting the new logic into a separate method as `build`
is already complicated.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
Preconditions.checkArgument(handleTimestampWithoutZone ||
!SparkUtil.hasTimestampWithoutZone(table.schema()),
SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
- Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
- TypeUtil.validateWriteSchema(table.schema(), writeSchema,
writeConf.checkNullability(), writeConf.checkOrdering());
+ Schema writeSchema;
+ boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
Review comment:
What about moving this to `SparkWriteConf`? We already have an instance
of it and all other configs are parsed through it.
Also, I am not sure about `mergeSchema`. It is the only option name that
uses camel case. I'd say we should either just support `merge-schema`, which
aligns with all other option names, or support camel case for all options
(which could be done in a follow-up PR).
##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema
schema) {
* @throws IllegalArgumentException if a field cannot be found (by name) in
the source schema
*/
public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
- Types.StructType struct = visit(schema, new
ReassignIds(idSourceSchema)).asStructType();
+ Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema,
null)).asStructType();
+ return new Schema(struct.fields(), refreshIdentifierFields(struct,
schema));
+ }
+
+ public static Schema reassignOrRefreshIds(Schema schema, Schema
idSourceSchema) {
+ AtomicInteger highest = new AtomicInteger(schema.highestFieldId());
Review comment:
I guess we assume `UpdateSchema` will take care of the conflicting IDs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]