laskoviymishka commented on code in PR #15615:
URL: https://github.com/apache/iceberg/pull/15615#discussion_r3311617542


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
       structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
     }
 
-    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    // Get ID columns configuration and map to field IDs
+    List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+    // Create initial schema to get field IDs
+    org.apache.iceberg.Schema initialSchema = new 
org.apache.iceberg.Schema(structType.fields());
+
+    // Validate and map ID columns to field IDs
+    Set<Integer> identifierFieldIds = Sets.newHashSet();

Review Comment:
   I'd restructure this block — `Schema.validateIdentifierField` already 
enforces all the constraints we care about (exists, required, primitive, not 
float/double, parents are required structs), and the `Schema` constructor 
invokes it for us. The hand-rolled version here only covers the first two, so 
float/double and non-primitive id columns slip past and escape as 
`IllegalArgumentException` out of `Preconditions.checkArgument` rather than the 
`DataException` the connector framework expects.
   
   Letting the constructor validate, then re-wrapping its 
`IllegalArgumentException`, gets the missing checks for free and collapses the 
two-pass `initialSchema → finalSchema` and the `isEmpty()` ternary at line 128 
too (the constructor accepts an empty set fine — `Ints.toArray(emptySet) == new 
int[0]`):
   
   ```java
   List<String> idColumns = config.tableConfig(tableName).idColumns();
   
   Schema initialSchema = new Schema(structType.fields());
   Set<Integer> identifierFieldIds =
       idColumns.stream()
           .map(name -> {
             NestedField field = initialSchema.findField(name);
             if (field == null) {
               throw new DataException(
                   String.format(
                       "ID column '%s' not found in schema for table %s. 
Available columns: %s",
                       name, tableName,
                       
initialSchema.columns().stream().map(NestedField::name).collect(Collectors.toList())));
             }
             return field.fieldId();
           })
           .collect(Collectors.toSet());
   
   Schema schema;
   try {
     schema = new Schema(structType.fields(), identifierFieldIds);
   } catch (IllegalArgumentException e) {
     throw new DataException(
         String.format("Invalid identifier column configuration for table %s: 
%s", tableName, e.getMessage()),
         e);
   }
   ```
   
   This way the two validators can't drift apart, and the error contract is 
consistent regardless of which constraint the user violated. wdyt?



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java:
##########
@@ -96,4 +102,133 @@ public void testAutoCreateTable(boolean partitioned) {
     assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", 
"foo2"));
     assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", 
"foo2", "foo3"));
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithIdentifierFields() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID columns
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", 
"foo1"));
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create a Kafka schema with required fields for identifier columns
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) // 
required
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) 
// required
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+    factory.autoCreateTable("foo.bar", record);
+
+    ArgumentCaptor<TableIdentifier> identCaptor = 
ArgumentCaptor.forClass(TableIdentifier.class);
+    ArgumentCaptor<Schema> schemaCaptor = 
ArgumentCaptor.forClass(Schema.class);
+    ArgumentCaptor<PartitionSpec> specCaptor = 
ArgumentCaptor.forClass(PartitionSpec.class);
+    ArgumentCaptor<Map<String, String>> propsCaptor = 
ArgumentCaptor.forClass(Map.class);
+
+    verify(catalog)
+        .createTable(
+            identCaptor.capture(),
+            schemaCaptor.capture(),
+            specCaptor.capture(),
+            propsCaptor.capture());
+
+    Schema schema = schemaCaptor.getValue();
+    assertThat(schema.findField("id").type()).isEqualTo(LongType.get());
+    assertThat(schema.findField("data").type()).isEqualTo(StringType.get());
+
+    // Verify identifier field IDs are set correctly
+    Set<Integer> identifierFieldIds = schema.identifierFieldIds();
+    assertThat(identifierFieldIds)
+        .as("Schema should have identifier field IDs set")
+        .containsExactlyInAnyOrder(
+            schema.findField("id").fieldId(), 
schema.findField("data").fieldId());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithMissingIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column that doesn't exist in the data
+    
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema without the missing column
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+
+    // Should throw DataException when ID column is not found in schema
+    assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
+        .isInstanceOf(DataException.class)
+        .hasMessageContaining("ID column 'missing_column' not found in schema")
+        .hasMessageContaining("Available columns:");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithOptionalIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema with optional identifier field
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", 
org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA) // optional!

Review Comment:
   This test covers `schemaForceOptional=false` plus a Kafka-level optional 
field, which is the easy case the user can fix. The harder case — 
`schemaForceOptional=true` plus a Kafka-level required field — is the one a 
real deployment hits, and it's the path that currently produces a misleading 
error per the inline on `IcebergWriterFactory.java:115`. I'd add a third test 
that stubs `config.schemaForceOptional()` to `true` and asserts whatever the 
right behavior turns out to be after that thread settles.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
       structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
     }
 
-    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    // Get ID columns configuration and map to field IDs
+    List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+    // Create initial schema to get field IDs
+    org.apache.iceberg.Schema initialSchema = new 
org.apache.iceberg.Schema(structType.fields());
+
+    // Validate and map ID columns to field IDs
+    Set<Integer> identifierFieldIds = Sets.newHashSet();
+    for (String idColumn : idColumns) {
+      NestedField field = initialSchema.findField(idColumn);
+      if (field == null) {
+        throw new DataException(
+            String.format(
+                "ID column '%s' not found in schema for table %s. Available 
columns: %s",
+                idColumn, tableName, initialSchema.columns()));
+      }
+      if (field.isOptional()) {

Review Comment:
   This error misleads users when `schemaForceOptional=true` is set — 
`SchemaUtils.toIcebergType` forces every field optional regardless of the 
upstream Kafka schema, so the suggestion to "update your Kafka schema" won't 
help. The real cause is the config flag, but the message doesn't say that.
   
   Either special-case the message when `config.schemaForceOptional()` is true, 
or reject the combination at config-validation time so the user gets a clean 
startup error rather than a per-table runtime one. I'd lean toward the latter 
since the two flags are semantically incompatible.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
       structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
     }
 
-    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    // Get ID columns configuration and map to field IDs
+    List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+    // Create initial schema to get field IDs
+    org.apache.iceberg.Schema initialSchema = new 
org.apache.iceberg.Schema(structType.fields());
+
+    // Validate and map ID columns to field IDs
+    Set<Integer> identifierFieldIds = Sets.newHashSet();
+    for (String idColumn : idColumns) {
+      NestedField field = initialSchema.findField(idColumn);
+      if (field == null) {
+        throw new DataException(

Review Comment:
   Worth knowing that `RecordUtils.createTableWriter` (line 125) throws 
`IllegalArgumentException("ID column not found: " + colName)` for this exact 
misconfiguration on the existing-table path. Same logical error, different 
exception type, depending on whether the table is auto-created or already 
exists. Aligning both on `DataException` is the right end state — out of scope 
here, but worth a follow-up note in the PR body so the inconsistency is tracked.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,39 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
       structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
     }
 
-    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    // Get ID columns configuration and map to field IDs
+    List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+    // Create initial schema to get field IDs
+    org.apache.iceberg.Schema initialSchema = new 
org.apache.iceberg.Schema(structType.fields());
+
+    // Validate and map ID columns to field IDs
+    Set<Integer> identifierFieldIds = Sets.newHashSet();
+    for (String idColumn : idColumns) {
+      NestedField field = initialSchema.findField(idColumn);

Review Comment:
   `findField` resolves dotted paths, so `id-columns=user.id` silently stamps 
the nested leaf as an identifier. The spec permits this when parents are 
required structs, but it's a real interop hazard with engine-side 
equality-delete merge-on-read, and there's no test or doc note suggesting it's 
intentional.
   
   I'd commit to one direction explicitly — either reject dotted paths here 
with a clear error, or add a test plus a line in the connector docs that calls 
out nested identifier support. Right now it reads as accidental.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to