wombatu-kun commented on code in PR #16598:
URL: https://github.com/apache/iceberg/pull/16598#discussion_r3371550471


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,55 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
       structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
     }
 
-    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+    if (!idColumns.isEmpty() && config.schemaForceOptional()) {
+      throw new DataException(
+          String.format(
+              "iceberg.tables.schema-force-optional is enabled for table %s 
but id-columns are configured. "
+                  + "schema-force-optional marks every field optional, which 
is incompatible with identifier fields that must be required. "
+                  + "Disable schema-force-optional or remove the id-columns 
configuration.",
+              tableName));
+    }
+
+    org.apache.iceberg.Schema initialSchema = new 
org.apache.iceberg.Schema(structType.fields());
+    Set<Integer> identifierFieldIds =
+        idColumns.stream()
+            .map(
+                name -> {
+                  if (name.contains(".")) {
+                    throw new DataException(
+                        String.format(
+                            "ID column '%s' for table %s must be a top-level 
column name, not a dotted path. "
+                                + "Nested identifier fields are not supported 
by the connector.",
+                            name, tableName));
+                  }
+                  NestedField field = initialSchema.findField(name);
+                  if (field == null) {
+                    throw new DataException(
+                        String.format(
+                            "ID column '%s' not found in schema for table %s. 
Available columns: %s",
+                            name,
+                            tableName,
+                            initialSchema.columns().stream()
+                                .map(NestedField::name)
+                                .collect(Collectors.toList())));
+                  }
+                  return field.fieldId();
+                })
+            .collect(Collectors.toSet());
+
+    org.apache.iceberg.Schema schema;
+    try {
+      schema = new org.apache.iceberg.Schema(structType.fields(), 
identifierFieldIds);
+    } catch (IllegalArgumentException e) {

Review Comment:
   The try/catch converting validateIdentifierField's IllegalArgumentException 
to a DataException is the centerpiece of this PR's error-contract change, but 
nothing tests it. This is exactly the gap @laskoviymishka raised on #15615 - 
float/double or non-primitive id-columns escaping as IllegalArgumentException 
out of Preconditions.checkArgument instead of the DataException the connector 
framework expects. The three new tests stop earlier: 
testAutoCreateTableRejectsIdColumnsWithSchemaForceOptional throws on the 
schema-force-optional guard above, and the RecordUtils test exercises the 
existing-table path that never builds a Schema with identifier fields. The 
catch is reachable in practice with schema-force-optional=false and an optional 
or double-typed id-column. Add an autoCreateTable test that configures such an 
id-column and asserts DataException. The dotted-path (line 115) and 
missing-column (line 123) branches are also uncovered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to