twthorn commented on code in PR #16826:
URL: https://github.com/apache/iceberg/pull/16826#discussion_r3431775220


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {

Review Comment:
   Nice catch, updated so that we evolve the schema even regardless of parent 
evolution.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {
+                    evolveSchemaFromConnectSchema(
+                        recordField.schema(),
+                        tableField.type(),
+                        tableField.fieldId(),
+                        schemaUpdateConsumer);
+                  } else {
+                    // If the value is not null, then convert the value (and 
handle any schema
+                    // updates)
+                    result.setField(
+                        tableField.name(),
+                        convertValue(
+                            recordFieldValue,
+                            tableField.type(),
+                            tableField.fieldId(),
+                            schemaUpdateConsumer));
+                  }
                 }
               }
             });
     return result;
   }
 
+  private void evolveSchemaFromConnectSchema(
+      org.apache.kafka.connect.data.Schema recordSchema,
+      Type tableType,
+      int tableFieldId,
+      SchemaUpdate.Consumer schemaUpdateConsumer) {
+    if (recordSchema == null) {
+      return;
+    }
+    switch (recordSchema.type()) {
+      case STRUCT:
+        if (tableType.isStructType()) {
+          StructType structType = tableType.asStructType();
+          for (Field field : recordSchema.fields()) {
+            NestedField nestedField = lookupStructField(field.name(), 
structType, tableFieldId);
+            if (nestedField == null) {
+              String parentFieldName = 
tableSchema.findColumnName(tableFieldId);

Review Comment:
   Made the change for consistency. Will publish follow up PR to DRY up code to 
keep things consistent.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {
+                    evolveSchemaFromConnectSchema(
+                        recordField.schema(),
+                        tableField.type(),
+                        tableField.fieldId(),
+                        schemaUpdateConsumer);
+                  } else {
+                    // If the value is not null, then convert the value (and 
handle any schema
+                    // updates)
+                    result.setField(
+                        tableField.name(),
+                        convertValue(
+                            recordFieldValue,
+                            tableField.type(),
+                            tableField.fieldId(),
+                            schemaUpdateConsumer));
+                  }
                 }
               }
             });
     return result;
   }
 
+  private void evolveSchemaFromConnectSchema(

Review Comment:
   Agreed, will put out a follow up PR on this one to align them.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {
+                    evolveSchemaFromConnectSchema(
+                        recordField.schema(),
+                        tableField.type(),
+                        tableField.fieldId(),
+                        schemaUpdateConsumer);
+                  } else {
+                    // If the value is not null, then convert the value (and 
handle any schema
+                    // updates)
+                    result.setField(
+                        tableField.name(),
+                        convertValue(
+                            recordFieldValue,
+                            tableField.type(),
+                            tableField.fieldId(),
+                            schemaUpdateConsumer));
+                  }
                 }
               }
             });
     return result;
   }
 
+  private void evolveSchemaFromConnectSchema(
+      org.apache.kafka.connect.data.Schema recordSchema,
+      Type tableType,
+      int tableFieldId,
+      SchemaUpdate.Consumer schemaUpdateConsumer) {
+    if (recordSchema == null) {
+      return;
+    }
+    switch (recordSchema.type()) {
+      case STRUCT:
+        if (tableType.isStructType()) {
+          StructType structType = tableType.asStructType();
+          for (Field field : recordSchema.fields()) {
+            NestedField nestedField = lookupStructField(field.name(), 
structType, tableFieldId);
+            if (nestedField == null) {
+              String parentFieldName = 
tableSchema.findColumnName(tableFieldId);
+              Type type = SchemaUtils.toIcebergType(field.schema(), config);
+              schemaUpdateConsumer.addColumn(parentFieldName, field.name(), 
type);
+            } else {
+              PrimitiveType evolveDataType =
+                  SchemaUtils.needsDataTypeUpdate(nestedField.type(), 
field.schema());
+              if (evolveDataType != null) {
+                String fieldName = 
tableSchema.findColumnName(nestedField.fieldId());
+                schemaUpdateConsumer.updateType(fieldName, evolveDataType);
+              }
+              if (nestedField.isRequired() && field.schema().isOptional()) {

Review Comment:
   Agreed, both should use that config, will handle in that helper method 
follow up PR.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {
+                    evolveSchemaFromConnectSchema(
+                        recordField.schema(),
+                        tableField.type(),
+                        tableField.fieldId(),
+                        schemaUpdateConsumer);
+                  } else {
+                    // If the value is not null, then convert the value (and 
handle any schema
+                    // updates)
+                    result.setField(
+                        tableField.name(),
+                        convertValue(
+                            recordFieldValue,
+                            tableField.type(),
+                            tableField.fieldId(),
+                            schemaUpdateConsumer));
+                  }
                 }
               }
             });
     return result;
   }
 
+  private void evolveSchemaFromConnectSchema(
+      org.apache.kafka.connect.data.Schema recordSchema,
+      Type tableType,
+      int tableFieldId,
+      SchemaUpdate.Consumer schemaUpdateConsumer) {
+    if (recordSchema == null) {
+      return;
+    }
+    switch (recordSchema.type()) {
+      case STRUCT:
+        if (tableType.isStructType()) {
+          StructType structType = tableType.asStructType();
+          for (Field field : recordSchema.fields()) {
+            NestedField nestedField = lookupStructField(field.name(), 
structType, tableFieldId);
+            if (nestedField == null) {
+              String parentFieldName = 
tableSchema.findColumnName(tableFieldId);
+              Type type = SchemaUtils.toIcebergType(field.schema(), config);
+              schemaUpdateConsumer.addColumn(parentFieldName, field.name(), 
type);
+            } else {
+              PrimitiveType evolveDataType =
+                  SchemaUtils.needsDataTypeUpdate(nestedField.type(), 
field.schema());
+              if (evolveDataType != null) {
+                String fieldName = 
tableSchema.findColumnName(nestedField.fieldId());
+                schemaUpdateConsumer.updateType(fieldName, evolveDataType);
+              }
+              if (nestedField.isRequired() && field.schema().isOptional()) {
+                String fieldName = 
tableSchema.findColumnName(nestedField.fieldId());
+                schemaUpdateConsumer.makeOptional(fieldName);
+              }
+              evolveSchemaFromConnectSchema(
+                  field.schema(), nestedField.type(), nestedField.fieldId(), 
schemaUpdateConsumer);
+            }
+          }
+        }
+        break;
+      case ARRAY:
+        if (tableType.isListType()) {
+          ListType listType = tableType.asListType();
+          evolveSchemaFromConnectSchema(
+              recordSchema.valueSchema(),
+              listType.elementType(),
+              listType.elementId(),
+              schemaUpdateConsumer);
+        }
+        break;
+      case MAP:

Review Comment:
   Reproduced bug with test, added.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {
+                    evolveSchemaFromConnectSchema(
+                        recordField.schema(),
+                        tableField.type(),
+                        tableField.fieldId(),
+                        schemaUpdateConsumer);

Review Comment:
   Updated to maintain existing behavior



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java:
##########
@@ -859,6 +859,268 @@ private void assertTypesAddedFromStruct(Function<String, 
Type> fn) {
     assertThat(fn.apply("ma")).isInstanceOf(MapType.class);
   }
 
+  @Test
+  public void testNestedSchemaEvolutionStructWithNullValue() {

Review Comment:
   Updated to assert field value is null.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
                   }
                 }
                 if (!hasSchemaUpdates) {
-                  result.setField(
-                      tableField.name(),
-                      convertValue(
-                          struct.get(recordField),
-                          tableField.type(),
-                          tableField.fieldId(),
-                          schemaUpdateConsumer));
+                  Object recordFieldValue = struct.get(recordField);
+                  // If the value is null and schema evolution is on, then 
evolve the table schema
+                  // from the connect record schema
+                  if (recordFieldValue == null && schemaUpdateConsumer != 
null) {
+                    evolveSchemaFromConnectSchema(
+                        recordField.schema(),
+                        tableField.type(),
+                        tableField.fieldId(),
+                        schemaUpdateConsumer);
+                  } else {
+                    // If the value is not null, then convert the value (and 
handle any schema
+                    // updates)
+                    result.setField(
+                        tableField.name(),
+                        convertValue(
+                            recordFieldValue,
+                            tableField.type(),
+                            tableField.fieldId(),
+                            schemaUpdateConsumer));
+                  }
                 }
               }
             });
     return result;
   }
 
+  private void evolveSchemaFromConnectSchema(
+      org.apache.kafka.connect.data.Schema recordSchema,
+      Type tableType,
+      int tableFieldId,
+      SchemaUpdate.Consumer schemaUpdateConsumer) {
+    if (recordSchema == null) {
+      return;
+    }
+    switch (recordSchema.type()) {
+      case STRUCT:
+        if (tableType.isStructType()) {
+          StructType structType = tableType.asStructType();
+          for (Field field : recordSchema.fields()) {
+            NestedField nestedField = lookupStructField(field.name(), 
structType, tableFieldId);
+            if (nestedField == null) {
+              String parentFieldName = 
tableSchema.findColumnName(tableFieldId);
+              Type type = SchemaUtils.toIcebergType(field.schema(), config);
+              schemaUpdateConsumer.addColumn(parentFieldName, field.name(), 
type);
+            } else {
+              PrimitiveType evolveDataType =
+                  SchemaUtils.needsDataTypeUpdate(nestedField.type(), 
field.schema());
+              if (evolveDataType != null) {
+                String fieldName = 
tableSchema.findColumnName(nestedField.fieldId());
+                schemaUpdateConsumer.updateType(fieldName, evolveDataType);
+              }
+              if (nestedField.isRequired() && field.schema().isOptional()) {
+                String fieldName = 
tableSchema.findColumnName(nestedField.fieldId());
+                schemaUpdateConsumer.makeOptional(fieldName);
+              }
+              evolveSchemaFromConnectSchema(
+                  field.schema(), nestedField.type(), nestedField.fieldId(), 
schemaUpdateConsumer);
+            }
+          }
+        }
+        break;
+      case ARRAY:
+        if (tableType.isListType()) {
+          ListType listType = tableType.asListType();
+          evolveSchemaFromConnectSchema(
+              recordSchema.valueSchema(),
+              listType.elementType(),
+              listType.elementId(),
+              schemaUpdateConsumer);
+        }
+        break;
+      case MAP:
+        if (tableType.isMapType()) {
+          MapType mapType = tableType.asMapType();
+          evolveSchemaFromConnectSchema(
+              recordSchema.valueSchema(),

Review Comment:
   Reproduced with test, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to