danielcweeks commented on code in PR #15209:
URL: https://github.com/apache/iceberg/pull/15209#discussion_r2800310454
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java:
##########
@@ -209,21 +216,200 @@ private static Pair<String, Integer>
transformArgPair(String argsStr) {
return Pair.of(parts.get(0).trim(), Integer.parseInt(parts.get(1).trim()));
}
- static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) {
- return new SchemaGenerator(config).toIcebergType(valueSchema);
+ static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config,
boolean includeDefaults) {
+ return new SchemaGenerator(config,
includeDefaults).toIcebergType(valueSchema);
}
static Type inferIcebergType(Object value, IcebergSinkConfig config) {
- return new SchemaGenerator(config).inferIcebergType(value);
+ return new SchemaGenerator(config, true).inferIcebergType(value);
+ }
+
+ /**
+ * Converts a Kafka Connect default value to an Iceberg Literal based on the
provided Iceberg
+ * type.
+ *
+ * @param defaultValue the default value from Kafka Connect schema
+ * @param icebergType the target Iceberg type
+ * @return an Iceberg Literal representing the default value, or null if
conversion is not
+ * possible
+ */
+ static Literal<?> convertDefaultValue(Object defaultValue, Type icebergType)
{
+ if (defaultValue == null) {
+ return null;
+ }
+
+ try {
+ TypeID typeId = icebergType.typeId();
+ switch (typeId) {
+ case BOOLEAN:
+ return Expressions.lit((Boolean) defaultValue);
+ case INTEGER:
+ return convertIntegerDefault(defaultValue);
+ case LONG:
+ return convertLongDefault(defaultValue);
+ case FLOAT:
+ return convertFloatDefault(defaultValue);
+ case DOUBLE:
+ return convertDoubleDefault(defaultValue);
+ case STRING:
+ return Expressions.lit(defaultValue.toString());
+ case DECIMAL:
+ return convertDecimalDefault(defaultValue, (DecimalType)
icebergType);
+ case DATE:
+ return convertDateDefault(defaultValue, icebergType);
+ case TIME:
+ return convertTimeDefault(defaultValue, icebergType);
+ case TIMESTAMP:
+ return convertTimestampDefault(defaultValue);
+ case BINARY:
+ case FIXED:
+ return convertBinaryDefault(defaultValue);
+ case UUID:
+ return convertUuidDefault(defaultValue);
+ default:
+ // Nested types (LIST, MAP, STRUCT) cannot have non-null defaults in
Iceberg
Review Comment:
This statement is not accurate. List and Map must default to `null`.
Struct is supported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]