This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 208399fb4 [AMORO-3796] Fixed UUID type in Iceberg tables with 
partitions & buckets (#3797)
208399fb4 is described below

commit 208399fb44e02147bd97f49181d50905db2f87f1
Author: Mukhutdinov Artur <[email protected]>
AuthorDate: Mon Sep 29 10:43:12 2025 +0300

    [AMORO-3796] Fixed UUID type in Iceberg tables with partitions & buckets 
(#3797)
    
    * fix(iceberg): UUID
    
    implement automatic replacement of UUID type columns with the FixedType[16]
    
    * fix(iceberg): UUID
    
    add more elegant way to handle the issue with the Iceberg tables and UUID 
type
    
    * fix(iceberg): UUID
    
    add a little change to `converter` method to always return an instance of 
`IcebergRecordWrapper` class
    
    ---------
    
    Co-authored-by: Artur.Mukhutdinov <[email protected]>
---
 .../GenericIcebergPartitionedFanoutWriter.java     |   6 +-
 .../apache/iceberg/data/IcebergRecordWrapper.java  | 129 +++++++++++++++++++++
 2 files changed, 132 insertions(+), 3 deletions(-)

diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
index ae9011781..0b6e3405c 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
@@ -23,7 +23,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.IcebergRecordWrapper;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
@@ -37,7 +37,7 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 public class GenericIcebergPartitionedFanoutWriter extends 
PartitionedFanoutWriter<Record> {
 
   private final PartitionKey partitionKey;
-  private final InternalRecordWrapper wrapper;
+  private final IcebergRecordWrapper wrapper;
 
   public GenericIcebergPartitionedFanoutWriter(
       Schema schema,
@@ -49,7 +49,7 @@ public class GenericIcebergPartitionedFanoutWriter extends 
PartitionedFanoutWrit
       long targetFileSize) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    this.wrapper = new IcebergRecordWrapper(schema.asStruct());
   }
 
   @Override
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java
 
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java
new file mode 100644
index 000000000..28108bc1b
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.UUIDUtil;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Function;
+
+/**
+ * This class is a copy of {@link InternalRecordWrapper} that adds proper 
handling for UUID types in
+ * the <a href="https://github.com/apache/iceberg";>Iceberg</a> format.
+ *
+ * <p>It serves as a temporary solution for an issue discussed in <a
+ * href="https://github.com/apache/amoro/pull/3797";>this pull request</a>.
+ *
+ * <p>Once the related <a 
href="https://github.com/apache/iceberg/pull/14208";>pull request</a> is
+ * merged, the Iceberg dependency version should be updated accordingly, and 
this class will be
+ * removed.
+ */
+public class IcebergRecordWrapper implements StructLike {
+  private final Function<Object, Object>[] transforms;
+  private StructLike wrapped = null;
+
+  @SuppressWarnings("unchecked")
+  public IcebergRecordWrapper(Types.StructType struct) {
+    this(
+        struct.fields().stream()
+            .map(field -> converter(field.type()))
+            .toArray(
+                length -> (Function<Object, Object>[]) 
Array.newInstance(Function.class, length)));
+  }
+
+  private IcebergRecordWrapper(Function<Object, Object>[] transforms) {
+    this.transforms = transforms;
+  }
+
+  private static Function<Object, Object> converter(Type type) {
+    switch (type.typeId()) {
+      case DATE:
+        return date -> DateTimeUtil.daysFromDate((LocalDate) date);
+      case TIME:
+        return time -> DateTimeUtil.microsFromTime((LocalTime) time);
+      case TIMESTAMP:
+        if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+          return timestamp -> 
DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp);
+        } else {
+          return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) 
timestamp);
+        }
+      case FIXED:
+        return bytes -> ByteBuffer.wrap((byte[]) bytes);
+      case UUID:
+        return uuid -> {
+          if (uuid instanceof byte[]) {
+            return UUIDUtil.convert((byte[]) uuid);
+          } else {
+            return uuid;
+          }
+        };
+      case STRUCT:
+        IcebergRecordWrapper wrapper = new 
IcebergRecordWrapper(type.asStructType());
+        return struct -> wrapper.wrap((StructLike) struct);
+      default:
+    }
+    return null;
+  }
+
+  public StructLike get() {
+    return wrapped;
+  }
+
+  public IcebergRecordWrapper copyFor(StructLike record) {
+    return new IcebergRecordWrapper(transforms).wrap(record);
+  }
+
+  public IcebergRecordWrapper wrap(StructLike record) {
+    this.wrapped = record;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return wrapped.size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    if (transforms[pos] != null) {
+      Object value = wrapped.get(pos, Object.class);
+      if (value == null) {
+        // transforms function don't allow to handle null values, so just 
return null here.
+        return null;
+      } else {
+        return javaClass.cast(transforms[pos].apply(value));
+      }
+    }
+    return wrapped.get(pos, javaClass);
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    throw new UnsupportedOperationException("Cannot update 
IcebergRecordWrapper");
+  }
+}

Reply via email to