aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812432890



##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema 
schema) {
    * @throws IllegalArgumentException if a field cannot be found (by name) in 
the source schema
    */
   public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
-    Types.StructType struct = visit(schema, new 
ReassignIds(idSourceSchema)).asStructType();
+    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, 
null)).asStructType();
+    return new Schema(struct.fields(), refreshIdentifierFields(struct, 
schema));
+  }
+
+  public static Schema reassignOrRefreshIds(Schema schema, Schema 
idSourceSchema) {
+    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());

Review comment:
       Can there be a mismatch between the highest field ID in the schema and 
`last-column-id` in the metadata? For instance, what if I add a column, then 
drop it, and call this method with a new column? Can it cause the same column 
ID be used twice?

##########
File path: api/src/main/java/org/apache/iceberg/Schema.java
##########
@@ -102,7 +103,7 @@ public Schema(int schemaId, List<NestedField> columns, 
Map<String, Integer> alia
 
     this.identifierFieldIds = identifierFieldIds != null ? 
Ints.toArray(identifierFieldIds) : new int[0];
 
-    lazyIdToName();

Review comment:
       Did we remove this call on purpose? Seems unrelated.

##########
File path: api/src/main/java/org/apache/iceberg/types/ReassignIds.java
##########
@@ -43,6 +45,17 @@ public Type schema(Schema schema, Supplier<Type> future) {
     }
   }
 
+  private int id(Types.StructType sourceStruct, String name) {
+    if (sourceStruct != null) {
+      Types.NestedField sourceField = sourceStruct.field(name);
+      if (sourceField != null) {
+        return sourceField.fieldId();
+      }
+    }
+
+    return assignId.get();

Review comment:
       I see that `assignId` is set to null in `reassignIds`. Can this result 
in an NPE when called in `struct`?

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || 
!SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, 
writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {

Review comment:
       Will we always attempt to merge the schema if enabled? Even if the 
incoming schema doesn't include new columns?

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || 
!SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, 
writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {
+      // convert the dataset schema and assign fresh ids for new fields
+      Schema newSchema = SparkSchemaUtil.convertWithFreshIds(table.schema(), 
dsSchema);
+
+      // update the table to get final id assignments and validate the changes
+      UpdateSchema update = table.updateSchema().unionByNameWith(newSchema);
+      Schema mergedSchema = update.apply();
+
+      // reconvert the dsSchema without assignment to use the ids assigned by 
UpdateSchema
+      writeSchema = SparkSchemaUtil.convert(mergedSchema, dsSchema);
+
+      TypeUtil.validateWriteSchema(
+          mergedSchema, writeSchema, writeConf.checkNullability(), 
writeConf.checkOrdering());
+
+      // if the validation passed, update the table schema
+      update.commit();
+    } else {
+      writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);

Review comment:
       nit: What about putting the new logic into a separate method as `build` 
is already complicated.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || 
!SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, 
writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",

Review comment:
       What about moving this to `SparkWriteConf`? We already have an instance 
of it and all other configs are parsed through it.
   
   Also, I am not sure about `mergeSchema`. It is the only option name that 
uses camel case. I'd say we should either just support `merge-schema`, which 
aligns with all other option names, or support camel case for all options 
(which could be done in a follow-up PR).

##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema 
schema) {
    * @throws IllegalArgumentException if a field cannot be found (by name) in 
the source schema
    */
   public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
-    Types.StructType struct = visit(schema, new 
ReassignIds(idSourceSchema)).asStructType();
+    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, 
null)).asStructType();
+    return new Schema(struct.fields(), refreshIdentifierFields(struct, 
schema));
+  }
+
+  public static Schema reassignOrRefreshIds(Schema schema, Schema 
idSourceSchema) {
+    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());

Review comment:
       I guess we assume `UpdateSchema` will take care of the conflicting IDs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to