wombatu-kun commented on code in PR #16598:
URL: https://github.com/apache/iceberg/pull/16598#discussion_r3371550471
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -93,7 +96,55 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
structType = SchemaUtils.toIcebergType(sample.valueSchema(),
config).asStructType();
}
- org.apache.iceberg.Schema schema = new
org.apache.iceberg.Schema(structType.fields());
+ List<String> idColumns = config.tableConfig(tableName).idColumns();
+
+ if (!idColumns.isEmpty() && config.schemaForceOptional()) {
+ throw new DataException(
+ String.format(
+ "iceberg.tables.schema-force-optional is enabled for table %s
but id-columns are configured. "
+ + "schema-force-optional marks every field optional, which
is incompatible with identifier fields that must be required. "
+ + "Disable schema-force-optional or remove the id-columns
configuration.",
+ tableName));
+ }
+
+ org.apache.iceberg.Schema initialSchema = new
org.apache.iceberg.Schema(structType.fields());
+ Set<Integer> identifierFieldIds =
+ idColumns.stream()
+ .map(
+ name -> {
+ if (name.contains(".")) {
+ throw new DataException(
+ String.format(
+ "ID column '%s' for table %s must be a top-level
column name, not a dotted path. "
+ + "Nested identifier fields are not supported
by the connector.",
+ name, tableName));
+ }
+ NestedField field = initialSchema.findField(name);
+ if (field == null) {
+ throw new DataException(
+ String.format(
+ "ID column '%s' not found in schema for table %s.
Available columns: %s",
+ name,
+ tableName,
+ initialSchema.columns().stream()
+ .map(NestedField::name)
+ .collect(Collectors.toList())));
+ }
+ return field.fieldId();
+ })
+ .collect(Collectors.toSet());
+
+ org.apache.iceberg.Schema schema;
+ try {
+ schema = new org.apache.iceberg.Schema(structType.fields(),
identifierFieldIds);
+ } catch (IllegalArgumentException e) {
Review Comment:
The try/catch converting validateIdentifierField's IllegalArgumentException
to a DataException is the centerpiece of this PR's error-contract change, but
nothing tests it. This is exactly the gap @laskoviymishka raised on #15615 -
float/double or non-primitive id-columns escaping as IllegalArgumentException
out of Preconditions.checkArgument instead of the DataException the connector
framework expects. The three new tests stop earlier:
testAutoCreateTableRejectsIdColumnsWithSchemaForceOptional throws on the
schema-force-optional guard above, and the RecordUtils test exercises the
existing-table path that never builds a Schema with identifier fields. The
catch is reachable in practice with schema-force-optional=false and an optional
or double-typed id-column. Add an autoCreateTable test that configures such an
id-column and asserts DataException. The dotted-path (line 115) and
missing-column (line 123) branches are also uncovered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]