laskoviymishka commented on code in PR #15615:
URL: https://github.com/apache/iceberg/pull/15615#discussion_r3320386390


##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java:
##########
@@ -96,4 +102,178 @@ public void testAutoCreateTable(boolean partitioned) {
     assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", 
"foo2"));
     assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", 
"foo2", "foo3"));
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithIdentifierFields() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID columns
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", 
"foo1"));
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create a Kafka schema with required fields for identifier columns
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) // 
required
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) 
// required
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+    factory.autoCreateTable("foo.bar", record);
+
+    ArgumentCaptor<TableIdentifier> identCaptor = 
ArgumentCaptor.forClass(TableIdentifier.class);
+    ArgumentCaptor<Schema> schemaCaptor = 
ArgumentCaptor.forClass(Schema.class);
+    ArgumentCaptor<PartitionSpec> specCaptor = 
ArgumentCaptor.forClass(PartitionSpec.class);
+    ArgumentCaptor<Map<String, String>> propsCaptor = 
ArgumentCaptor.forClass(Map.class);
+
+    verify(catalog)
+        .createTable(
+            identCaptor.capture(),
+            schemaCaptor.capture(),
+            specCaptor.capture(),
+            propsCaptor.capture());
+
+    Schema schema = schemaCaptor.getValue();
+    assertThat(schema.findField("id").type()).isEqualTo(LongType.get());
+    assertThat(schema.findField("data").type()).isEqualTo(StringType.get());
+
+    // Verify identifier field IDs are set correctly
+    Set<Integer> identifierFieldIds = schema.identifierFieldIds();
+    assertThat(identifierFieldIds)
+        .as("Schema should have identifier field IDs set")
+        .containsExactlyInAnyOrder(
+            schema.findField("id").fieldId(), 
schema.findField("data").fieldId());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithMissingIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column that doesn't exist in the data
+    
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema without the missing column
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+
+    // Should throw DataException when ID column is not found in schema
+    assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
+        .isInstanceOf(DataException.class)
+        .hasMessageContaining("ID column 'missing_column' not found in schema")
+        .hasMessageContaining("Available columns:");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithNestedIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Dotted path references the required leaf inside a required struct
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("meta.id"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    org.apache.kafka.connect.data.Schema metaSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
+            .build();
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("meta", metaSchema)
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+            .build();
+
+    Struct metaValue = new Struct(metaSchema).put("id", 42L);
+    Struct value = new Struct(valueSchema).put("meta", metaValue).put("data", 
"foo");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+    factory.autoCreateTable("foo.bar", record);
+
+    ArgumentCaptor<Schema> schemaCaptor = 
ArgumentCaptor.forClass(Schema.class);
+    verify(catalog).createTable(any(), schemaCaptor.capture(), any(), any());
+
+    Schema schema = schemaCaptor.getValue();
+    int nestedLeafId = schema.findField("meta.id").fieldId();
+    assertThat(schema.identifierFieldIds())
+        .as("Nested leaf referenced by dotted id-column should be stamped as 
identifier")
+        .containsExactly(nestedLeafId);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithOptionalIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema with optional identifier field
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", 
org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA) // optional!
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+
+    // Should throw DataException when ID column is optional
+    assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
+        .isInstanceOf(DataException.class)
+        .hasMessageContaining("Invalid identifier column configuration for 
table foo.bar")
+        .hasMessageContaining("not a required field");

Review Comment:
   I'd drop the `"not a required field"` assertion — that string is owned by 
Iceberg's `Preconditions.checkArgument` inside `Schema`, and pinning the test 
to it couples us to internal phrasing that can shift. The `"Invalid identifier 
column configuration for table foo.bar"` substring on the line above is the 
connector-owned wrapper and is enough on its own.



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java:
##########
@@ -96,4 +102,178 @@ public void testAutoCreateTable(boolean partitioned) {
     assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", 
"foo2"));
     assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", 
"foo2", "foo3"));
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithIdentifierFields() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID columns
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", 
"foo1"));
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create a Kafka schema with required fields for identifier columns
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) // 
required
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) 
// required
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+    factory.autoCreateTable("foo.bar", record);
+
+    ArgumentCaptor<TableIdentifier> identCaptor = 
ArgumentCaptor.forClass(TableIdentifier.class);
+    ArgumentCaptor<Schema> schemaCaptor = 
ArgumentCaptor.forClass(Schema.class);
+    ArgumentCaptor<PartitionSpec> specCaptor = 
ArgumentCaptor.forClass(PartitionSpec.class);
+    ArgumentCaptor<Map<String, String>> propsCaptor = 
ArgumentCaptor.forClass(Map.class);
+
+    verify(catalog)
+        .createTable(
+            identCaptor.capture(),
+            schemaCaptor.capture(),
+            specCaptor.capture(),
+            propsCaptor.capture());
+
+    Schema schema = schemaCaptor.getValue();
+    assertThat(schema.findField("id").type()).isEqualTo(LongType.get());
+    assertThat(schema.findField("data").type()).isEqualTo(StringType.get());
+
+    // Verify identifier field IDs are set correctly
+    Set<Integer> identifierFieldIds = schema.identifierFieldIds();
+    assertThat(identifierFieldIds)
+        .as("Schema should have identifier field IDs set")
+        .containsExactlyInAnyOrder(
+            schema.findField("id").fieldId(), 
schema.findField("data").fieldId());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithMissingIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column that doesn't exist in the data
+    
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema without the missing column
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+
+    // Should throw DataException when ID column is not found in schema
+    assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
+        .isInstanceOf(DataException.class)
+        .hasMessageContaining("ID column 'missing_column' not found in schema")
+        .hasMessageContaining("Available columns:");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")

Review Comment:
   The `@SuppressWarnings("unchecked")` here, on 
`testAutoCreateTableWithMissingIdentifierField` (line 162), and on 
`testAutoCreateTableWithOptionalIdentifierField` (line 244) doesn't have 
anything to suppress — none of them do the `ArgumentCaptor.forClass(Map.class)` 
dance that `testAutoCreateTableWithIdentifierFields` at line 107 does. I'd 
remove it from these three and leave it only on the one that actually needs it.



##########
kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegration.java:
##########
@@ -136,6 +138,47 @@ public void testIcebergSinkAutoCreate(String branch) {
     assertThat(spec.isPartitioned()).isEqualTo(useSchema);
   }
 
+  @ParameterizedTest
+  @NullSource

Review Comment:
   I'd pair this with `@ValueSource(strings = "test_branch")` like the sibling 
tests do — right now `branch` is always null and `useSchema = true` is 
hardcoded, so the `if (useSchema)` branch is permanently taken and the `// one 
of the tests` comment doesn't really mean anything. Either parameterize over 
the branch and restore `boolean useSchema = branch == null;`, or drop the dead 
`if` entirely.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,41 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
       structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
     }
 
-    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    // Get ID columns configuration and map to field IDs
+    List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+    org.apache.iceberg.Schema initialSchema = new 
org.apache.iceberg.Schema(structType.fields());
+
+    Set<Integer> identifierFieldIds =
+        idColumns.stream()
+            .map(
+                name -> {
+                  NestedField field = initialSchema.findField(name);
+                  if (field == null) {
+                    throw new DataException(
+                        String.format(
+                            "ID column '%s' not found in schema for table %s. 
Available columns: %s",
+                            name,
+                            tableName,
+                            initialSchema.columns().stream()
+                                .map(NestedField::name)
+                                .collect(Collectors.toList())));
+                  }
+                  return field.fieldId();
+                })
+            .collect(Collectors.toSet());
+
+    org.apache.iceberg.Schema schema;
+    try {
+      schema = new org.apache.iceberg.Schema(structType.fields(), 
identifierFieldIds);
+    } catch (IllegalArgumentException e) {

Review Comment:
   This is the main holdout from last round. When 
`iceberg.tables.schema-force-optional=true` collides with `idColumns`, the 
rewrapped message is `"Invalid identifier column configuration for table 
foo.bar: Cannot add field X as an identifier field: not a required field"` — 
correct in principle, but nothing in there points at `schema-force-optional` as 
the cause. I'd detect the force-optional case in the catch and append a hint:
   
   ```java
   } catch (IllegalArgumentException e) {
     String hint = config.schemaForceOptional()
         ? " (hint: iceberg.tables.schema-force-optional=true forces all fields 
optional, which conflicts with identifier columns)"
         : "";
     throw new DataException(
         String.format("Invalid identifier column configuration for table %s: 
%s%s", tableName, e.getMessage(), hint), e);
   }
   ```
   
   wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to