This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f499788a40 Add Iceberg Schema Support for PassThroughLogicalType 
(#36870)
7f499788a40 is described below

commit 7f499788a40256dad2d4231eac097caac7793040
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Fri Nov 21 07:51:46 2025 -0800

    Add Iceberg Schema Support for PassThroughLogicalType (#36870)
    
    * Add support for more schema types in Iceberg IO
    
    * clean up test
    
    * Add SQL type
    
    * Apply spotless
---
 .../beam/sdk/schemas/logicaltypes/SqlTypes.java    |   4 +
 .../apache/beam/sdk/io/iceberg/IcebergUtils.java   |  16 +++-
 .../beam/sdk/io/iceberg/IcebergUtilsTest.java      | 103 +++++++++++++++++++++
 3 files changed, 121 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
index c8af8d03333..62b1c3c6ee3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
@@ -21,6 +21,7 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema.LogicalType;
 import org.apache.beam.sdk.values.Row;
 
@@ -40,4 +41,7 @@ public class SqlTypes {
 
   /** Beam LogicalType corresponding to TIMESTAMP type. */
   public static final LogicalType<Instant, Row> TIMESTAMP = new 
MicrosInstant();
+
+  /** Beam LogicalType corresponding to UUID type. */
+  public static final LogicalType<UUID, Row> UUID = new UuidLogicalType();
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index 0c2bc71c6f8..4b448a2e08c 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -34,6 +34,8 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
+import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.Row;
@@ -71,6 +73,7 @@ public class IcebergUtils {
           .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
           .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
           .put(SqlTypes.DATETIME.getIdentifier(), 
Types.TimestampType.withoutZone())
+          .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get())
           .build();
 
   private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
@@ -175,8 +178,17 @@ public class IcebergUtils {
       return new TypeAndMaxId(
           --nestedFieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
     } else if (beamType.getTypeName().isLogicalType()) {
-      String logicalTypeIdentifier =
-          checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
+      Schema.LogicalType<?, ?> logicalType = 
checkArgumentNotNull(beamType.getLogicalType());
+      if (logicalType instanceof FixedPrecisionNumeric) {
+        Row args = 
Preconditions.checkArgumentNotNull(logicalType.getArgument());
+        Integer precision = 
Preconditions.checkArgumentNotNull(args.getInt32("precision"));
+        Integer scale = 
Preconditions.checkArgumentNotNull(args.getInt32("scale"));
+        return new TypeAndMaxId(--nestedFieldId, 
Types.DecimalType.of(precision, scale));
+      }
+      if (logicalType instanceof PassThroughLogicalType) {
+        return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(), 
nestedFieldId);
+      }
+      String logicalTypeIdentifier = logicalType.getIdentifier();
       @Nullable Type type = 
BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
       if (type == null) {
         throw new RuntimeException("Unsupported Beam logical type " + 
logicalTypeIdentifier);
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
index 115a6790919..c9026522dba 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
@@ -35,7 +35,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -937,5 +942,103 @@ public class IcebergUtilsTest {
 
       assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
     }
+
+    static final Schema BEAM_SCHEMA_JDBC_ALL_TYPES =
+        Schema.builder()
+            .addField("array_field", 
Schema.FieldType.array(Schema.FieldType.STRING)) // from ARRAY
+            .addField("bigint_field", Schema.FieldType.INT64) // from BIGINT
+            .addField(
+                "binary_field",
+                Schema.FieldType.logicalType(VariableBytes.of("BINARY", 10))) 
// from BINARY
+            .addField("bit_field", Schema.FieldType.BOOLEAN) // from BIT
+            .addField("boolean_field", Schema.FieldType.BOOLEAN) // from 
BOOLEAN
+            .addField(
+                "char_field", 
Schema.FieldType.logicalType(FixedString.of("CHAR", 10))) // from CHAR
+            .addField("date_field", 
Schema.FieldType.logicalType(SqlTypes.DATE)) // from DATE
+            .addField("decimal_field", Schema.FieldType.DECIMAL) // from 
DECIMAL
+            .addField("double_field", Schema.FieldType.DOUBLE) // from DOUBLE
+            .addField("float_field", Schema.FieldType.DOUBLE) // from FLOAT
+            .addField("integer_field", Schema.FieldType.INT32) // from INTEGER
+            .addField(
+                "longnvarchar_field",
+                Schema.FieldType.logicalType(
+                    VariableString.of("LONGNVARCHAR", 100))) // from 
LONGNVARCHAR
+            .addField(
+                "longvarbinary_field",
+                Schema.FieldType.logicalType(
+                    VariableBytes.of("LONGVARBINARY", 100))) // from 
LONGVARBINARY
+            .addField(
+                "longvarchar_field",
+                Schema.FieldType.logicalType(
+                    VariableString.of("LONGVARCHAR", 100))) // from LONGVARCHAR
+            .addField(
+                "nchar_field",
+                Schema.FieldType.logicalType(FixedString.of("NCHAR", 10))) // 
from NCHAR
+            .addField(
+                "numeric_field",
+                Schema.FieldType.logicalType(FixedPrecisionNumeric.of(10, 5))) 
// from NUMERIC
+            .addField(
+                "nvarchar_field",
+                Schema.FieldType.logicalType(VariableString.of("NVARCHAR", 
100))) // from NVARCHAR
+            .addField("real_field", Schema.FieldType.FLOAT) // from REAL
+            .addField("smallint_field", Schema.FieldType.INT16) // from 
SMALLINT
+            .addField("time_field", 
Schema.FieldType.logicalType(SqlTypes.TIME)) // from TIME
+            .addField(
+                "timestamp_field",
+                Schema.FieldType.logicalType(SqlTypes.DATETIME)) // from 
TIMESTAMP
+            .addField(
+                "timestamp_with_timezone_field",
+                Schema.FieldType.DATETIME) // from TIMESTAMP_WITH_TIMEZONE
+            .addField("tinyint_field", Schema.FieldType.BYTE) // from TINYINT
+            .addField(
+                "varbinary_field",
+                Schema.FieldType.logicalType(VariableBytes.of("VARBINARY", 
100))) // from VARBINARY
+            .addField(
+                "varchar_field",
+                Schema.FieldType.logicalType(VariableString.of("VARCHAR", 
100))) // from VARCHAR
+            .addField("blob_field", Schema.FieldType.BYTES) // from BLOB
+            .addField("clob_field", Schema.FieldType.STRING) // from CLOB
+            .addField(
+                "uuid_field", Schema.FieldType.logicalType(new 
UuidLogicalType())) // from UUID
+            .build();
+
+    static final org.apache.iceberg.Schema ICEBERG_SCHEMA_JDBC_ALL_TYPES =
+        new org.apache.iceberg.Schema(
+            required(1, "array_field", Types.ListType.ofRequired(29, 
Types.StringType.get())),
+            required(2, "bigint_field", Types.LongType.get()),
+            required(3, "binary_field", Types.BinaryType.get()),
+            required(4, "bit_field", Types.BooleanType.get()),
+            required(5, "boolean_field", Types.BooleanType.get()),
+            required(6, "char_field", Types.StringType.get()),
+            required(7, "date_field", Types.DateType.get()),
+            required(8, "decimal_field", Types.StringType.get()),
+            required(9, "double_field", Types.DoubleType.get()),
+            required(10, "float_field", Types.DoubleType.get()),
+            required(11, "integer_field", Types.IntegerType.get()),
+            required(12, "longnvarchar_field", Types.StringType.get()),
+            required(13, "longvarbinary_field", Types.BinaryType.get()),
+            required(14, "longvarchar_field", Types.StringType.get()),
+            required(15, "nchar_field", Types.StringType.get()),
+            required(16, "numeric_field", Types.DecimalType.of(10, 5)),
+            required(17, "nvarchar_field", Types.StringType.get()),
+            required(18, "real_field", Types.FloatType.get()),
+            required(19, "smallint_field", Types.StringType.get()),
+            required(20, "time_field", Types.TimeType.get()),
+            required(21, "timestamp_field", Types.TimestampType.withoutZone()),
+            required(22, "timestamp_with_timezone_field", 
Types.TimestampType.withZone()),
+            required(23, "tinyint_field", Types.StringType.get()),
+            required(24, "varbinary_field", Types.BinaryType.get()),
+            required(25, "varchar_field", Types.StringType.get()),
+            required(26, "blob_field", Types.BinaryType.get()),
+            required(27, "clob_field", Types.StringType.get()),
+            required(28, "uuid_field", Types.UUIDType.get()));
+
+    @Test
+    public void testJdbcBeamSchemaToIcebergSchema() {
+      org.apache.iceberg.Schema convertedIcebergSchema =
+          IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_JDBC_ALL_TYPES);
+
+      
assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_JDBC_ALL_TYPES));
+    }
   }
 }

Reply via email to