This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7f499788a40 Add Iceberg Schema Support for PassThroughLogicalType
(#36870)
7f499788a40 is described below
commit 7f499788a40256dad2d4231eac097caac7793040
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Fri Nov 21 07:51:46 2025 -0800
Add Iceberg Schema Support for PassThroughLogicalType (#36870)
* Add support for more schema types in Iceberg IO
* clean up test
* Add SQL type
* Apply spotless
---
.../beam/sdk/schemas/logicaltypes/SqlTypes.java | 4 +
.../apache/beam/sdk/io/iceberg/IcebergUtils.java | 16 +++-
.../beam/sdk/io/iceberg/IcebergUtilsTest.java | 103 +++++++++++++++++++++
3 files changed, 121 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
index c8af8d03333..62b1c3c6ee3 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
@@ -21,6 +21,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.values.Row;
@@ -40,4 +41,7 @@ public class SqlTypes {
/** Beam LogicalType corresponding to TIMESTAMP type. */
public static final LogicalType<Instant, Row> TIMESTAMP = new
MicrosInstant();
+
+ /** Beam LogicalType corresponding to UUID type. */
+ public static final LogicalType<UUID, Row> UUID = new UuidLogicalType();
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index 0c2bc71c6f8..4b448a2e08c 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -34,6 +34,8 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
+import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
@@ -71,6 +73,7 @@ public class IcebergUtils {
.put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
.put(SqlTypes.DATETIME.getIdentifier(),
Types.TimestampType.withoutZone())
+ .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get())
.build();
private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
@@ -175,8 +178,17 @@ public class IcebergUtils {
return new TypeAndMaxId(
--nestedFieldId,
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isLogicalType()) {
- String logicalTypeIdentifier =
- checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
+ Schema.LogicalType<?, ?> logicalType =
checkArgumentNotNull(beamType.getLogicalType());
+ if (logicalType instanceof FixedPrecisionNumeric) {
+ Row args =
Preconditions.checkArgumentNotNull(logicalType.getArgument());
+ Integer precision =
Preconditions.checkArgumentNotNull(args.getInt32("precision"));
+ Integer scale =
Preconditions.checkArgumentNotNull(args.getInt32("scale"));
+ return new TypeAndMaxId(--nestedFieldId,
Types.DecimalType.of(precision, scale));
+ }
+ if (logicalType instanceof PassThroughLogicalType) {
+ return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(),
nestedFieldId);
+ }
+ String logicalTypeIdentifier = logicalType.getIdentifier();
@Nullable Type type =
BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
if (type == null) {
throw new RuntimeException("Unsupported Beam logical type " +
logicalTypeIdentifier);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
index 115a6790919..c9026522dba 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
@@ -35,7 +35,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -937,5 +942,103 @@ public class IcebergUtilsTest {
assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
}
+
+ static final Schema BEAM_SCHEMA_JDBC_ALL_TYPES =
+ Schema.builder()
+ .addField("array_field",
Schema.FieldType.array(Schema.FieldType.STRING)) // from ARRAY
+ .addField("bigint_field", Schema.FieldType.INT64) // from BIGINT
+ .addField(
+ "binary_field",
+ Schema.FieldType.logicalType(VariableBytes.of("BINARY", 10)))
// from BINARY
+ .addField("bit_field", Schema.FieldType.BOOLEAN) // from BIT
+ .addField("boolean_field", Schema.FieldType.BOOLEAN) // from
BOOLEAN
+ .addField(
+ "char_field",
Schema.FieldType.logicalType(FixedString.of("CHAR", 10))) // from CHAR
+ .addField("date_field",
Schema.FieldType.logicalType(SqlTypes.DATE)) // from DATE
+ .addField("decimal_field", Schema.FieldType.DECIMAL) // from
DECIMAL
+ .addField("double_field", Schema.FieldType.DOUBLE) // from DOUBLE
+ .addField("float_field", Schema.FieldType.DOUBLE) // from FLOAT
+ .addField("integer_field", Schema.FieldType.INT32) // from INTEGER
+ .addField(
+ "longnvarchar_field",
+ Schema.FieldType.logicalType(
+ VariableString.of("LONGNVARCHAR", 100))) // from
LONGNVARCHAR
+ .addField(
+ "longvarbinary_field",
+ Schema.FieldType.logicalType(
+ VariableBytes.of("LONGVARBINARY", 100))) // from
LONGVARBINARY
+ .addField(
+ "longvarchar_field",
+ Schema.FieldType.logicalType(
+ VariableString.of("LONGVARCHAR", 100))) // from LONGVARCHAR
+ .addField(
+ "nchar_field",
+ Schema.FieldType.logicalType(FixedString.of("NCHAR", 10))) //
from NCHAR
+ .addField(
+ "numeric_field",
+ Schema.FieldType.logicalType(FixedPrecisionNumeric.of(10, 5)))
// from NUMERIC
+ .addField(
+ "nvarchar_field",
+ Schema.FieldType.logicalType(VariableString.of("NVARCHAR",
100))) // from NVARCHAR
+ .addField("real_field", Schema.FieldType.FLOAT) // from REAL
+ .addField("smallint_field", Schema.FieldType.INT16) // from
SMALLINT
+ .addField("time_field",
Schema.FieldType.logicalType(SqlTypes.TIME)) // from TIME
+ .addField(
+ "timestamp_field",
+ Schema.FieldType.logicalType(SqlTypes.DATETIME)) // from
TIMESTAMP
+ .addField(
+ "timestamp_with_timezone_field",
+ Schema.FieldType.DATETIME) // from TIMESTAMP_WITH_TIMEZONE
+ .addField("tinyint_field", Schema.FieldType.BYTE) // from TINYINT
+ .addField(
+ "varbinary_field",
+ Schema.FieldType.logicalType(VariableBytes.of("VARBINARY",
100))) // from VARBINARY
+ .addField(
+ "varchar_field",
+ Schema.FieldType.logicalType(VariableString.of("VARCHAR",
100))) // from VARCHAR
+ .addField("blob_field", Schema.FieldType.BYTES) // from BLOB
+ .addField("clob_field", Schema.FieldType.STRING) // from CLOB
+ .addField(
+ "uuid_field", Schema.FieldType.logicalType(new
UuidLogicalType())) // from UUID
+ .build();
+
+ static final org.apache.iceberg.Schema ICEBERG_SCHEMA_JDBC_ALL_TYPES =
+ new org.apache.iceberg.Schema(
+ required(1, "array_field", Types.ListType.ofRequired(29,
Types.StringType.get())),
+ required(2, "bigint_field", Types.LongType.get()),
+ required(3, "binary_field", Types.BinaryType.get()),
+ required(4, "bit_field", Types.BooleanType.get()),
+ required(5, "boolean_field", Types.BooleanType.get()),
+ required(6, "char_field", Types.StringType.get()),
+ required(7, "date_field", Types.DateType.get()),
+ required(8, "decimal_field", Types.StringType.get()),
+ required(9, "double_field", Types.DoubleType.get()),
+ required(10, "float_field", Types.DoubleType.get()),
+ required(11, "integer_field", Types.IntegerType.get()),
+ required(12, "longnvarchar_field", Types.StringType.get()),
+ required(13, "longvarbinary_field", Types.BinaryType.get()),
+ required(14, "longvarchar_field", Types.StringType.get()),
+ required(15, "nchar_field", Types.StringType.get()),
+ required(16, "numeric_field", Types.DecimalType.of(10, 5)),
+ required(17, "nvarchar_field", Types.StringType.get()),
+ required(18, "real_field", Types.FloatType.get()),
+ required(19, "smallint_field", Types.StringType.get()),
+ required(20, "time_field", Types.TimeType.get()),
+ required(21, "timestamp_field", Types.TimestampType.withoutZone()),
+ required(22, "timestamp_with_timezone_field",
Types.TimestampType.withZone()),
+ required(23, "tinyint_field", Types.StringType.get()),
+ required(24, "varbinary_field", Types.BinaryType.get()),
+ required(25, "varchar_field", Types.StringType.get()),
+ required(26, "blob_field", Types.BinaryType.get()),
+ required(27, "clob_field", Types.StringType.get()),
+ required(28, "uuid_field", Types.UUIDType.get()));
+
+ @Test
+ public void testJdbcBeamSchemaToIcebergSchema() {
+ org.apache.iceberg.Schema convertedIcebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_JDBC_ALL_TYPES);
+
+
assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_JDBC_ALL_TYPES));
+ }
}
}