laskoviymishka commented on code in PR #15615:
URL: https://github.com/apache/iceberg/pull/15615#discussion_r3311617542
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
structType = SchemaUtils.toIcebergType(sample.valueSchema(),
config).asStructType();
}
- org.apache.iceberg.Schema schema = new
org.apache.iceberg.Schema(structType.fields());
+ // Get ID columns configuration and map to field IDs
+ List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+ // Create initial schema to get field IDs
+ org.apache.iceberg.Schema initialSchema = new
org.apache.iceberg.Schema(structType.fields());
+
+ // Validate and map ID columns to field IDs
+ Set<Integer> identifierFieldIds = Sets.newHashSet();
Review Comment:
I'd restructure this block — `Schema.validateIdentifierField` already
enforces all the constraints we care about (exists, required, primitive, not
float/double, parents are required structs), and the `Schema` constructor
invokes it for us. The hand-rolled version here only covers the first two, so
float/double and non-primitive id columns slip past and escape as
`IllegalArgumentException` out of `Preconditions.checkArgument` rather than the
`DataException` the connector framework expects.
Letting the constructor validate, then re-wrapping its
`IllegalArgumentException`, gets the missing checks for free and collapses the
two-pass `initialSchema → finalSchema` and the `isEmpty()` ternary at line 128
too (the constructor accepts an empty set fine — `Ints.toArray(emptySet) == new
int[0]`):
```java
List<String> idColumns = config.tableConfig(tableName).idColumns();
Schema initialSchema = new Schema(structType.fields());
Set<Integer> identifierFieldIds =
idColumns.stream()
.map(name -> {
NestedField field = initialSchema.findField(name);
if (field == null) {
throw new DataException(
String.format(
"ID column '%s' not found in schema for table %s.
Available columns: %s",
name, tableName,
initialSchema.columns().stream().map(NestedField::name).collect(Collectors.toList())));
}
return field.fieldId();
})
.collect(Collectors.toSet());
Schema schema;
try {
schema = new Schema(structType.fields(), identifierFieldIds);
} catch (IllegalArgumentException e) {
throw new DataException(
String.format("Invalid identifier column configuration for table %s:
%s", tableName, e.getMessage()),
e);
}
```
This way the two validators can't drift apart, and the error contract is
consistent regardless of which constraint the user violated. wdyt?
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java:
##########
@@ -96,4 +102,133 @@ public void testAutoCreateTable(boolean partitioned) {
assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1",
"foo2"));
assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1",
"foo2", "foo3"));
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAutoCreateTableWithIdentifierFields() {
+ Catalog catalog = mock(Catalog.class,
withSettings().extraInterfaces(SupportsNamespaces.class));
+ when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such
table"));
+
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+ // Configure ID columns
+ when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data"));
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop",
"foo1"));
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+
+ // Create a Kafka schema with required fields for identifier columns
+ org.apache.kafka.connect.data.Schema valueSchema =
+ SchemaBuilder.struct()
+ .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) //
required
+ .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
// required
+ .build();
+
+ Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2");
+
+ SinkRecord record = mock(SinkRecord.class);
+ when(record.valueSchema()).thenReturn(valueSchema);
+ when(record.value()).thenReturn(value);
+
+ IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+ factory.autoCreateTable("foo.bar", record);
+
+ ArgumentCaptor<TableIdentifier> identCaptor =
ArgumentCaptor.forClass(TableIdentifier.class);
+ ArgumentCaptor<Schema> schemaCaptor =
ArgumentCaptor.forClass(Schema.class);
+ ArgumentCaptor<PartitionSpec> specCaptor =
ArgumentCaptor.forClass(PartitionSpec.class);
+ ArgumentCaptor<Map<String, String>> propsCaptor =
ArgumentCaptor.forClass(Map.class);
+
+ verify(catalog)
+ .createTable(
+ identCaptor.capture(),
+ schemaCaptor.capture(),
+ specCaptor.capture(),
+ propsCaptor.capture());
+
+ Schema schema = schemaCaptor.getValue();
+ assertThat(schema.findField("id").type()).isEqualTo(LongType.get());
+ assertThat(schema.findField("data").type()).isEqualTo(StringType.get());
+
+ // Verify identifier field IDs are set correctly
+ Set<Integer> identifierFieldIds = schema.identifierFieldIds();
+ assertThat(identifierFieldIds)
+ .as("Schema should have identifier field IDs set")
+ .containsExactlyInAnyOrder(
+ schema.findField("id").fieldId(),
schema.findField("data").fieldId());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAutoCreateTableWithMissingIdentifierField() {
+ Catalog catalog = mock(Catalog.class,
withSettings().extraInterfaces(SupportsNamespaces.class));
+ when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such
table"));
+
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+ // Configure ID column that doesn't exist in the data
+
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column"));
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+
+ // Create schema without the missing column
+ org.apache.kafka.connect.data.Schema valueSchema =
+ SchemaBuilder.struct()
+ .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
+ .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+ .build();
+
+ Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");
+
+ SinkRecord record = mock(SinkRecord.class);
+ when(record.valueSchema()).thenReturn(valueSchema);
+ when(record.value()).thenReturn(value);
+
+ IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+
+ // Should throw DataException when ID column is not found in schema
+ assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
+ .isInstanceOf(DataException.class)
+ .hasMessageContaining("ID column 'missing_column' not found in schema")
+ .hasMessageContaining("Available columns:");
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAutoCreateTableWithOptionalIdentifierField() {
+ Catalog catalog = mock(Catalog.class,
withSettings().extraInterfaces(SupportsNamespaces.class));
+ when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such
table"));
+
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+ // Configure ID column
+ when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id"));
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+
+ // Create schema with optional identifier field
+ org.apache.kafka.connect.data.Schema valueSchema =
+ SchemaBuilder.struct()
+ .field("id",
org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA) // optional!
Review Comment:
This test covers `schemaForceOptional=false` plus a Kafka-level optional
field, which is the easy case the user can fix. The harder case —
`schemaForceOptional=true` plus a Kafka-level required field — is the one a
real deployment hits, and it's the path that currently produces a misleading
error per the inline on `IcebergWriterFactory.java:115`. I'd add a third test
that stubs `config.schemaForceOptional()` to `true` and asserts whatever the
right behavior turns out to be after that thread settles.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
structType = SchemaUtils.toIcebergType(sample.valueSchema(),
config).asStructType();
}
- org.apache.iceberg.Schema schema = new
org.apache.iceberg.Schema(structType.fields());
+ // Get ID columns configuration and map to field IDs
+ List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+ // Create initial schema to get field IDs
+ org.apache.iceberg.Schema initialSchema = new
org.apache.iceberg.Schema(structType.fields());
+
+ // Validate and map ID columns to field IDs
+ Set<Integer> identifierFieldIds = Sets.newHashSet();
+ for (String idColumn : idColumns) {
+ NestedField field = initialSchema.findField(idColumn);
+ if (field == null) {
+ throw new DataException(
+ String.format(
+ "ID column '%s' not found in schema for table %s. Available
columns: %s",
+ idColumn, tableName, initialSchema.columns()));
+ }
+ if (field.isOptional()) {
Review Comment:
This error misleads users when `schemaForceOptional=true` is set —
`SchemaUtils.toIcebergType` forces every field optional regardless of the
upstream Kafka schema, so the suggestion to "update your Kafka schema" won't
help. The real cause is the config flag, but the message doesn't say that.
Either special-case the message when `config.schemaForceOptional()` is true,
or reject the combination at config-validation time so the user gets a clean
startup error rather than a per-table runtime one. I'd lean toward the latter
since the two flags are semantically incompatible.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
structType = SchemaUtils.toIcebergType(sample.valueSchema(),
config).asStructType();
}
- org.apache.iceberg.Schema schema = new
org.apache.iceberg.Schema(structType.fields());
+ // Get ID columns configuration and map to field IDs
+ List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+ // Create initial schema to get field IDs
+ org.apache.iceberg.Schema initialSchema = new
org.apache.iceberg.Schema(structType.fields());
+
+ // Validate and map ID columns to field IDs
+ Set<Integer> identifierFieldIds = Sets.newHashSet();
+ for (String idColumn : idColumns) {
+ NestedField field = initialSchema.findField(idColumn);
+ if (field == null) {
+ throw new DataException(
Review Comment:
Worth knowing that `RecordUtils.createTableWriter` (line 125) throws
`IllegalArgumentException("ID column not found: " + colName)` for this exact
misconfiguration on the existing-table path. Same logical error, different
exception type, depending on whether the table is auto-created or already
exists. Aligning both on `DataException` is the right end state — out of scope
here, but worth a follow-up note in the PR body so the inconsistency is tracked.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
structType = SchemaUtils.toIcebergType(sample.valueSchema(),
config).asStructType();
}
- org.apache.iceberg.Schema schema = new
org.apache.iceberg.Schema(structType.fields());
+ // Get ID columns configuration and map to field IDs
+ List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+ // Create initial schema to get field IDs
+ org.apache.iceberg.Schema initialSchema = new
org.apache.iceberg.Schema(structType.fields());
+
+ // Validate and map ID columns to field IDs
+ Set<Integer> identifierFieldIds = Sets.newHashSet();
+ for (String idColumn : idColumns) {
+ NestedField field = initialSchema.findField(idColumn);
Review Comment:
`findField` resolves dotted paths, so `id-columns=user.id` silently stamps
the nested leaf as an identifier. The spec permits this when parents are
required structs, but it's a real interop hazard with engine-side
equality-delete merge-on-read, and there's no test or doc note suggesting it's
intentional.
I'd commit to one direction explicitly — either reject dotted paths here
with a clear error, or add a test plus a line in the connector docs that calls
out nested identifier support. Right now it reads as accidental.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]