This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 208399fb4 [AMORO-3796] Fixed UUID type in Iceberg tables with
partitions & buckets (#3797)
208399fb4 is described below
commit 208399fb44e02147bd97f49181d50905db2f87f1
Author: Mukhutdinov Artur <[email protected]>
AuthorDate: Mon Sep 29 10:43:12 2025 +0300
[AMORO-3796] Fixed UUID type in Iceberg tables with partitions & buckets
(#3797)
* fix(iceberg): UUID
implement automatic replacement of UUID type columns with the FixedType[16]
* fix(iceberg): UUID
add more elegant way to handle the issue with the Iceberg tables and UUID
type
* fix(iceberg): UUID
add a little change to `converter` method to always return an instance of
`IcebergRecordWrapper` class
---------
Co-authored-by: Artur.Mukhutdinov <[email protected]>
---
.../GenericIcebergPartitionedFanoutWriter.java | 6 +-
.../apache/iceberg/data/IcebergRecordWrapper.java | 129 +++++++++++++++++++++
2 files changed, 132 insertions(+), 3 deletions(-)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
index ae9011781..0b6e3405c 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java
@@ -23,7 +23,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.IcebergRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
@@ -37,7 +37,7 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
public class GenericIcebergPartitionedFanoutWriter extends
PartitionedFanoutWriter<Record> {
private final PartitionKey partitionKey;
- private final InternalRecordWrapper wrapper;
+ private final IcebergRecordWrapper wrapper;
public GenericIcebergPartitionedFanoutWriter(
Schema schema,
@@ -49,7 +49,7 @@ public class GenericIcebergPartitionedFanoutWriter extends
PartitionedFanoutWrit
long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.wrapper = new InternalRecordWrapper(schema.asStruct());
+ this.wrapper = new IcebergRecordWrapper(schema.asStruct());
}
@Override
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java
new file mode 100644
index 000000000..28108bc1b
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.UUIDUtil;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Function;
+
+/**
+ * This class is a copy of {@link InternalRecordWrapper} that adds proper
handling for UUID types in
+ * the <a href="https://github.com/apache/iceberg">Iceberg</a> format.
+ *
+ * <p>It serves as a temporary solution for an issue discussed in <a
+ * href="https://github.com/apache/amoro/pull/3797">this pull request</a>.
+ *
+ * <p>Once the related <a
href="https://github.com/apache/iceberg/pull/14208">pull request</a> is
+ * merged, the Iceberg dependency version should be updated accordingly, and
this class will be
+ * removed.
+ */
+public class IcebergRecordWrapper implements StructLike {
+ private final Function<Object, Object>[] transforms;
+ private StructLike wrapped = null;
+
+ @SuppressWarnings("unchecked")
+ public IcebergRecordWrapper(Types.StructType struct) {
+ this(
+ struct.fields().stream()
+ .map(field -> converter(field.type()))
+ .toArray(
+ length -> (Function<Object, Object>[])
Array.newInstance(Function.class, length)));
+ }
+
+ private IcebergRecordWrapper(Function<Object, Object>[] transforms) {
+ this.transforms = transforms;
+ }
+
+ private static Function<Object, Object> converter(Type type) {
+ switch (type.typeId()) {
+ case DATE:
+ return date -> DateTimeUtil.daysFromDate((LocalDate) date);
+ case TIME:
+ return time -> DateTimeUtil.microsFromTime((LocalTime) time);
+ case TIMESTAMP:
+ if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+ return timestamp ->
DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp);
+ } else {
+ return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime)
timestamp);
+ }
+ case FIXED:
+ return bytes -> ByteBuffer.wrap((byte[]) bytes);
+ case UUID:
+ return uuid -> {
+ if (uuid instanceof byte[]) {
+ return UUIDUtil.convert((byte[]) uuid);
+ } else {
+ return uuid;
+ }
+ };
+ case STRUCT:
+ IcebergRecordWrapper wrapper = new
IcebergRecordWrapper(type.asStructType());
+ return struct -> wrapper.wrap((StructLike) struct);
+ default:
+ }
+ return null;
+ }
+
+ public StructLike get() {
+ return wrapped;
+ }
+
+ public IcebergRecordWrapper copyFor(StructLike record) {
+ return new IcebergRecordWrapper(transforms).wrap(record);
+ }
+
+ public IcebergRecordWrapper wrap(StructLike record) {
+ this.wrapped = record;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return wrapped.size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ if (transforms[pos] != null) {
+ Object value = wrapped.get(pos, Object.class);
+ if (value == null) {
+ // transforms function don't allow to handle null values, so just
return null here.
+ return null;
+ } else {
+ return javaClass.cast(transforms[pos].apply(value));
+ }
+ }
+ return wrapped.get(pos, javaClass);
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Cannot update
IcebergRecordWrapper");
+ }
+}