laskoviymishka commented on code in PR #16826:
URL: https://github.com/apache/iceberg/pull/16826#discussion_r3424815257
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
Review Comment:
The null-value evolution only fires inside the `if (!hasSchemaUpdates)`
block, so it's suppressed exactly when a field needs both a top-level update
and a nested walk.
If `tableField` itself needs a type promotion or required→optional change,
`hasSchemaUpdates` is true and this whole branch is skipped — so a record whose
field is a null struct carrying a new sub-column (table field FLOAT→DOUBLE,
value null, nested schema gained a column) never reaches
`evolveSchemaFromConnectSchema` and the addColumn is lost for that cycle. The
value path has the same shape, but the null path makes it much likelier to hit
in production: a CDC delete nulls every non-PK field precisely when schemas are
drifting, so there's no guarantee a later non-null record recovers the missed
update.
Lift the null-value call out of the `!hasSchemaUpdates` gate so it always
runs when the value is null and a consumer is present. It only queues updates
and never touches `result`, so it's safe to run regardless of
`hasSchemaUpdates`. This is the one blocker for me.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
+ evolveSchemaFromConnectSchema(
+ recordField.schema(),
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer);
+ } else {
+ // If the value is not null, then convert the value (and
handle any schema
+ // updates)
+ result.setField(
+ tableField.name(),
+ convertValue(
+ recordFieldValue,
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer));
+ }
}
}
});
return result;
}
+ private void evolveSchemaFromConnectSchema(
+ org.apache.kafka.connect.data.Schema recordSchema,
+ Type tableType,
+ int tableFieldId,
+ SchemaUpdate.Consumer schemaUpdateConsumer) {
+ if (recordSchema == null) {
+ return;
+ }
+ switch (recordSchema.type()) {
+ case STRUCT:
+ if (tableType.isStructType()) {
+ StructType structType = tableType.asStructType();
+ for (Field field : recordSchema.fields()) {
+ NestedField nestedField = lookupStructField(field.name(),
structType, tableFieldId);
+ if (nestedField == null) {
+ String parentFieldName =
tableSchema.findColumnName(tableFieldId);
+ Type type = SchemaUtils.toIcebergType(field.schema(), config);
+ schemaUpdateConsumer.addColumn(parentFieldName, field.name(),
type);
+ } else {
+ PrimitiveType evolveDataType =
+ SchemaUtils.needsDataTypeUpdate(nestedField.type(),
field.schema());
+ if (evolveDataType != null) {
+ String fieldName =
tableSchema.findColumnName(nestedField.fieldId());
+ schemaUpdateConsumer.updateType(fieldName, evolveDataType);
+ }
+ if (nestedField.isRequired() && field.schema().isOptional()) {
+ String fieldName =
tableSchema.findColumnName(nestedField.fieldId());
+ schemaUpdateConsumer.makeOptional(fieldName);
+ }
+ evolveSchemaFromConnectSchema(
+ field.schema(), nestedField.type(), nestedField.fieldId(),
schemaUpdateConsumer);
+ }
+ }
+ }
+ break;
+ case ARRAY:
+ if (tableType.isListType()) {
+ ListType listType = tableType.asListType();
+ evolveSchemaFromConnectSchema(
+ recordSchema.valueSchema(),
+ listType.elementType(),
+ listType.elementId(),
+ schemaUpdateConsumer);
+ }
+ break;
+ case MAP:
Review Comment:
This walks the map value but never the key, so new fields inside a struct
key are dropped on the null path. Iceberg allows struct map keys, and the
existing `convertMapValue` processes both key and value through `convertValue`
— so this is an asymmetry the null path introduces versus the value path it
mirrors.
Not blocking, but since you're in here I'd add the parallel key recursion:
```java
evolveSchemaFromConnectSchema(
recordSchema.keySchema(),
mapType.keyType(),
mapType.keyId(),
schemaUpdateConsumer);
```
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
+ evolveSchemaFromConnectSchema(
+ recordField.schema(),
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer);
+ } else {
+ // If the value is not null, then convert the value (and
handle any schema
+ // updates)
+ result.setField(
+ tableField.name(),
+ convertValue(
+ recordFieldValue,
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer));
+ }
}
}
});
return result;
}
+ private void evolveSchemaFromConnectSchema(
+ org.apache.kafka.connect.data.Schema recordSchema,
+ Type tableType,
+ int tableFieldId,
+ SchemaUpdate.Consumer schemaUpdateConsumer) {
+ if (recordSchema == null) {
+ return;
+ }
+ switch (recordSchema.type()) {
+ case STRUCT:
+ if (tableType.isStructType()) {
+ StructType structType = tableType.asStructType();
+ for (Field field : recordSchema.fields()) {
+ NestedField nestedField = lookupStructField(field.name(),
structType, tableFieldId);
+ if (nestedField == null) {
+ String parentFieldName =
tableSchema.findColumnName(tableFieldId);
+ Type type = SchemaUtils.toIcebergType(field.schema(), config);
+ schemaUpdateConsumer.addColumn(parentFieldName, field.name(),
type);
+ } else {
+ PrimitiveType evolveDataType =
+ SchemaUtils.needsDataTypeUpdate(nestedField.type(),
field.schema());
+ if (evolveDataType != null) {
+ String fieldName =
tableSchema.findColumnName(nestedField.fieldId());
+ schemaUpdateConsumer.updateType(fieldName, evolveDataType);
+ }
+ if (nestedField.isRequired() && field.schema().isOptional()) {
Review Comment:
This `makeOptional` guard doesn't consult `config.schemaForceOptional()`, so
with that flag set an existing required field won't be made optional through
the null path. The value path has the identical limitation, so this is
pre-existing and consistent rather than a regression — flagging so it's a
deliberate choice, not an oversight. If you want them aligned,
`config.schemaForceOptional() || field.schema().isOptional()` in both guards
does it. Fine for a follow-up.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
+ evolveSchemaFromConnectSchema(
+ recordField.schema(),
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer);
+ } else {
+ // If the value is not null, then convert the value (and
handle any schema
+ // updates)
+ result.setField(
+ tableField.name(),
+ convertValue(
+ recordFieldValue,
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer));
+ }
}
}
});
return result;
}
+ private void evolveSchemaFromConnectSchema(
+ org.apache.kafka.connect.data.Schema recordSchema,
+ Type tableType,
+ int tableFieldId,
+ SchemaUpdate.Consumer schemaUpdateConsumer) {
+ if (recordSchema == null) {
+ return;
+ }
+ switch (recordSchema.type()) {
+ case STRUCT:
+ if (tableType.isStructType()) {
+ StructType structType = tableType.asStructType();
+ for (Field field : recordSchema.fields()) {
+ NestedField nestedField = lookupStructField(field.name(),
structType, tableFieldId);
+ if (nestedField == null) {
+ String parentFieldName =
tableSchema.findColumnName(tableFieldId);
+ Type type = SchemaUtils.toIcebergType(field.schema(), config);
+ schemaUpdateConsumer.addColumn(parentFieldName, field.name(),
type);
+ } else {
+ PrimitiveType evolveDataType =
+ SchemaUtils.needsDataTypeUpdate(nestedField.type(),
field.schema());
+ if (evolveDataType != null) {
+ String fieldName =
tableSchema.findColumnName(nestedField.fieldId());
+ schemaUpdateConsumer.updateType(fieldName, evolveDataType);
+ }
+ if (nestedField.isRequired() && field.schema().isOptional()) {
+ String fieldName =
tableSchema.findColumnName(nestedField.fieldId());
+ schemaUpdateConsumer.makeOptional(fieldName);
+ }
+ evolveSchemaFromConnectSchema(
+ field.schema(), nestedField.type(), nestedField.fieldId(),
schemaUpdateConsumer);
+ }
+ }
+ }
+ break;
+ case ARRAY:
+ if (tableType.isListType()) {
+ ListType listType = tableType.asListType();
+ evolveSchemaFromConnectSchema(
+ recordSchema.valueSchema(),
+ listType.elementType(),
+ listType.elementId(),
+ schemaUpdateConsumer);
+ }
+ break;
+ case MAP:
+ if (tableType.isMapType()) {
+ MapType mapType = tableType.asMapType();
+ evolveSchemaFromConnectSchema(
+ recordSchema.valueSchema(),
Review Comment:
For a schema-less map `recordSchema.valueSchema()` can be null, and
`SchemaUtils.toIcebergType(null)` down the STRUCT recursion can NPE. The method
already guards `recordSchema == null` at the top — the same guard on the
recursion arg here (and on the ARRAY case above) keeps it consistent. Not
blocking.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
+ evolveSchemaFromConnectSchema(
+ recordField.schema(),
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer);
+ } else {
+ // If the value is not null, then convert the value (and
handle any schema
+ // updates)
+ result.setField(
+ tableField.name(),
+ convertValue(
+ recordFieldValue,
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer));
+ }
}
}
});
return result;
}
+ private void evolveSchemaFromConnectSchema(
+ org.apache.kafka.connect.data.Schema recordSchema,
+ Type tableType,
+ int tableFieldId,
+ SchemaUpdate.Consumer schemaUpdateConsumer) {
+ if (recordSchema == null) {
+ return;
+ }
+ switch (recordSchema.type()) {
+ case STRUCT:
+ if (tableType.isStructType()) {
+ StructType structType = tableType.asStructType();
+ for (Field field : recordSchema.fields()) {
+ NestedField nestedField = lookupStructField(field.name(),
structType, tableFieldId);
+ if (nestedField == null) {
+ String parentFieldName =
tableSchema.findColumnName(tableFieldId);
Review Comment:
`findColumnName` can return null, and there's no `tableFieldId < 0 ? null`
guard like `convertToStruct(Map)` has. If `parentFieldName` comes back null,
`addColumn(null, field.name(), type)` adds the column at the root instead of
nested — silent schema corruption rather than a failure.
It's unreachable in the current call graph since `tableFieldId` is always a
valid field id here, so not blocking, but it's a latent trap for any future
caller. I'd mirror the existing `structFieldId < 0` guard.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
+ evolveSchemaFromConnectSchema(
+ recordField.schema(),
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer);
+ } else {
+ // If the value is not null, then convert the value (and
handle any schema
+ // updates)
+ result.setField(
+ tableField.name(),
+ convertValue(
+ recordFieldValue,
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer));
+ }
}
}
});
return result;
}
+ private void evolveSchemaFromConnectSchema(
Review Comment:
This duplicates the three evolution ops (addColumn/updateType/makeOptional)
that already live in the value path around the `convertToStruct` switch — the
rules now live in two places and any future change has to land in both. The
minimal-fix framing is defensible and I wouldn't block on extracting a shared
helper, but a short comment here pointing at the value-path counterpart would
save the next person from forgetting the second site.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##########
@@ -255,19 +255,92 @@ private GenericRecord convertToStruct(
}
}
if (!hasSchemaUpdates) {
- result.setField(
- tableField.name(),
- convertValue(
- struct.get(recordField),
- tableField.type(),
- tableField.fieldId(),
- schemaUpdateConsumer));
+ Object recordFieldValue = struct.get(recordField);
+ // If the value is null and schema evolution is on, then
evolve the table schema
+ // from the connect record schema
+ if (recordFieldValue == null && schemaUpdateConsumer !=
null) {
+ evolveSchemaFromConnectSchema(
+ recordField.schema(),
+ tableField.type(),
+ tableField.fieldId(),
+ schemaUpdateConsumer);
Review Comment:
Before this change the null branch still called `result.setField(name,
convertValue(null, ...))`, which set the field to null. Now it skips `setField`
entirely. `GenericRecord` defaults fields to null so the observable result is
the same today, but the two paths diverge and it's fragile if the record impl
ever changes its defaults. I'd call `result.setField(tableField.name(), null)`
after the evolve call to keep the behavior identical. Not blocking.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java:
##########
@@ -859,6 +859,268 @@ private void assertTypesAddedFromStruct(Function<String,
Type> fn) {
assertThat(fn.apply("ma")).isInstanceOf(MapType.class);
}
+ @Test
+ public void testNestedSchemaEvolutionStructWithNullValue() {
Review Comment:
All six new tests assert only on `consumer.addColumns()` (and
updateTypes/makeOptionals) and never on the Record returned by `convert`. That
leaves the `result.setField(name, null)` omission unverified — none of these
would fail if the field were left unset versus set to null.
I'd have at least this test capture the returned Record and assert the
null-valued field is actually null on it, so the output-value contract is
pinned and not just the schema-update side effect. Worth adding alongside the
blocker fix — a test for a field that needs both a top-level update and a null
nested struct would also pin the case that's currently dropped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]