This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ff304cf8 [AMORO-3356] Include ResourceSerde interface and impl with 
Simple/Kryo/Java Serialize (#3571)
7ff304cf8 is described below

commit 7ff304cf8cda3ec95954ec9ea97328e5ce541e0f
Author: ConradJam <[email protected]>
AuthorDate: Thu May 29 22:18:05 2025 +0800

    [AMORO-3356] Include ResourceSerde interface and impl with Simple/Kryo/Java 
Serialize (#3571)
    
    * [AMORO-3356] Include ResourceSerde interface and impl with 
SimpleSerialize and KryoSerialize
    
    * fix up
    
    * fix up
    
    ---------
    
    Co-authored-by: Xu Bai <[email protected]>
---
 .../apache/amoro/serialization/JavaSerializer.java |  52 +++++++
 .../KryoSerializer.java}                           |  91 ++-----------
 .../apache/amoro/serialization/ResourceSerde.java  |  42 ++++++
 .../amoro/serialization/SimpleSerializer.java      |  56 ++++++++
 .../org/apache/amoro/utils/SerializationUtil.java  | 151 +++------------------
 .../apache/amoro/utils/map/SimpleSpillableMap.java |  35 +++--
 .../utils/map/StructLikeWrapperSerializer.java     |   8 +-
 7 files changed, 205 insertions(+), 230 deletions(-)

diff --git 
a/amoro-common/src/main/java/org/apache/amoro/serialization/JavaSerializer.java 
b/amoro-common/src/main/java/org/apache/amoro/serialization/JavaSerializer.java
new file mode 100644
index 000000000..7496ab4fb
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/serialization/JavaSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.serialization;
+
+import static 
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+/**
+ * JavaSerializer Use KryoSerialize to serialize/deserialize java objects
+ *
+ * @param <R>
+ */
+@SuppressWarnings("rawtypes")
+public class JavaSerializer<R extends Serializable> implements 
ResourceSerde<R> {
+
+  private static final KryoSerializer kryoSerialize = new KryoSerializer<>();
+
+  public static final JavaSerializer INSTANT = new JavaSerializer<>();
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public byte[] serializeResource(R resource) {
+    checkNotNull(resource);
+    return kryoSerialize.serializeResource(resource);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public R deserializeResource(byte[] input) {
+    if (input == null) {
+      return null;
+    }
+    return (R) kryoSerialize.deserializeResource(input);
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java 
b/amoro-common/src/main/java/org/apache/amoro/serialization/KryoSerializer.java
similarity index 59%
copy from 
amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
copy to 
amoro-common/src/main/java/org/apache/amoro/serialization/KryoSerializer.java
index 930e25acc..cc80db303 100644
--- a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/serialization/KryoSerializer.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.utils;
-
-import static 
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+package org.apache.amoro.serialization;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -27,59 +25,26 @@ import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.util.Utf8;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class SerializationUtil {
+public class KryoSerializer<R> implements ResourceSerde<R> {
 
   private static final ThreadLocal<KryoSerializerInstance> KRYO_SERIALIZER =
       ThreadLocal.withInitial(KryoSerializerInstance::new);
 
-  public static ByteBuffer simpleSerialize(Object obj) {
-    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-      try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-        oos.writeObject(obj);
-        oos.flush();
-        return ByteBuffer.wrap(bos.toByteArray());
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("serialization error of " + obj, e);
-    }
-  }
-
-  public static <T> T simpleDeserialize(byte[] bytes) {
-    if (bytes == null) {
-      return null;
-    }
-    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
-      try (ObjectInputStream ois = new ObjectInputStream(bis)) {
-        return (T) ois.readObject();
-      }
-    } catch (IOException | ClassNotFoundException e) {
-      throw new IllegalArgumentException("deserialization error ", e);
-    }
-  }
-
-  public static byte[] kryoSerialize(final Object obj) {
-    return KRYO_SERIALIZER.get().serialize(obj);
+  @Override
+  public byte[] serializeResource(R resource) {
+    return KRYO_SERIALIZER.get().serialize(resource);
   }
 
+  @Override
   @SuppressWarnings("unchecked")
-  public static <T> T kryoDeserialize(final byte[] objectData) {
-    if (objectData == null) {
-      throw new NullPointerException("The byte[] must not be null");
+  public R deserializeResource(byte[] input) {
+    if (input == null) {
+      throw new NullPointerException("The bytes[] Input must not be null");
     }
-    return (T) KRYO_SERIALIZER.get().deserialize(objectData);
-  }
-
-  public static <T> SimpleSerializer<T> createJavaSimpleSerializer() {
-    return JavaSerializer.INSTANT;
+    return (R) KRYO_SERIALIZER.get().deserialize(input);
   }
 
   private static class KryoSerializerInstance implements Serializable {
@@ -88,8 +53,8 @@ public class SerializationUtil {
     private final ByteArrayOutputStream outputStream;
 
     KryoSerializerInstance() {
-      KryoInstantiator kryoInstantiator = new KryoInstantiator();
-      kryo = kryoInstantiator.newKryo();
+      KryoInstantiation kryoInstantiation = new KryoInstantiation();
+      kryo = kryoInstantiation.newKryo();
       outputStream = new 
ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
       kryo.setRegistrationRequired(false);
     }
@@ -108,7 +73,7 @@ public class SerializationUtil {
     }
   }
 
-  private static class KryoInstantiator implements Serializable {
+  private static class KryoInstantiation implements Serializable {
 
     public Kryo newKryo() {
       Kryo kryo = new Kryo();
@@ -132,45 +97,19 @@ public class SerializationUtil {
 
   private static class AvroUtf8Serializer extends Serializer<Utf8> {
 
-    @SuppressWarnings("unchecked")
     @Override
+    @SuppressWarnings("unchecked")
     public void write(Kryo kryo, Output output, Utf8 utf8String) {
       Serializer<byte[]> bytesSerializer = 
kryo.getDefaultSerializer(byte[].class);
       bytesSerializer.write(kryo, output, utf8String.getBytes());
     }
 
-    @SuppressWarnings("unchecked")
     @Override
+    @SuppressWarnings("unchecked")
     public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
       Serializer<byte[]> bytesSerializer = 
kryo.getDefaultSerializer(byte[].class);
       byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
       return new Utf8(bytes);
     }
   }
-
-  public interface SimpleSerializer<T> {
-
-    byte[] serialize(T t);
-
-    T deserialize(byte[] bytes);
-  }
-
-  public static class JavaSerializer<T extends Serializable> implements 
SimpleSerializer<T> {
-
-    public static final JavaSerializer INSTANT = new JavaSerializer<>();
-
-    @Override
-    public byte[] serialize(T t) {
-      checkNotNull(t);
-      return SerializationUtil.kryoSerialize(t);
-    }
-
-    @Override
-    public T deserialize(byte[] bytes) {
-      if (bytes == null) {
-        return null;
-      }
-      return SerializationUtil.kryoDeserialize(bytes);
-    }
-  }
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/serialization/ResourceSerde.java 
b/amoro-common/src/main/java/org/apache/amoro/serialization/ResourceSerde.java
new file mode 100644
index 000000000..7c780675b
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/serialization/ResourceSerde.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.serialization;
+
+/**
+ * ResourceSerde interface
+ *
+ * @param <R> resource serde with obj
+ */
+public interface ResourceSerde<R> {
+
+  /**
+   * serialize resource
+   *
+   * @param resource input object
+   */
+  byte[] serializeResource(R resource);
+
+  /**
+   * deserialize resource
+   *
+   * @param input bytes
+   * @return output deserialize obj
+   */
+  R deserializeResource(byte[] input);
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/serialization/SimpleSerializer.java
 
b/amoro-common/src/main/java/org/apache/amoro/serialization/SimpleSerializer.java
new file mode 100644
index 000000000..4ebfc56a2
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/serialization/SimpleSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class SimpleSerializer<R> implements ResourceSerde<R> {
+
+  @Override
+  public byte[] serializeResource(R resource) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+        oos.writeObject(resource);
+        oos.flush();
+        return bos.toByteArray();
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("serialization error of " + resource, 
e);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public R deserializeResource(byte[] input) {
+    if (input == null) {
+      return null;
+    }
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(input)) {
+      try (ObjectInputStream ois = new ObjectInputStream(bis)) {
+        return (R) ois.readObject();
+      }
+    } catch (IOException | ClassNotFoundException e) {
+      throw new IllegalArgumentException("deserialization error ", e);
+    }
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java 
b/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
index 930e25acc..5da2ca4bb 100644
--- a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
+++ b/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
@@ -18,159 +18,40 @@
 
 package org.apache.amoro.utils;
 
-import static 
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.amoro.serialization.JavaSerializer;
+import org.apache.amoro.serialization.KryoSerializer;
+import org.apache.amoro.serialization.ResourceSerde;
+import org.apache.amoro.serialization.SimpleSerializer;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class SerializationUtil {
 
-  private static final ThreadLocal<KryoSerializerInstance> KRYO_SERIALIZER =
-      ThreadLocal.withInitial(KryoSerializerInstance::new);
+  private static final KryoSerializer kryoSerialize = new KryoSerializer();
+  private static final SimpleSerializer simpleSerialize = new 
SimpleSerializer();
 
   public static ByteBuffer simpleSerialize(Object obj) {
-    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-      try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-        oos.writeObject(obj);
-        oos.flush();
-        return ByteBuffer.wrap(bos.toByteArray());
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("serialization error of " + obj, e);
-    }
+    return ByteBuffer.wrap(simpleSerialize.serializeResource(obj));
   }
 
-  public static <T> T simpleDeserialize(byte[] bytes) {
-    if (bytes == null) {
-      return null;
-    }
-    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
-      try (ObjectInputStream ois = new ObjectInputStream(bis)) {
-        return (T) ois.readObject();
-      }
-    } catch (IOException | ClassNotFoundException e) {
-      throw new IllegalArgumentException("deserialization error ", e);
-    }
+  public static <R> R simpleDeserialize(byte[] bytes) {
+    return (R) simpleSerialize.deserializeResource(bytes);
   }
 
   public static byte[] kryoSerialize(final Object obj) {
-    return KRYO_SERIALIZER.get().serialize(obj);
+    return kryoSerialize.serializeResource(obj);
   }
 
-  @SuppressWarnings("unchecked")
-  public static <T> T kryoDeserialize(final byte[] objectData) {
-    if (objectData == null) {
-      throw new NullPointerException("The byte[] must not be null");
-    }
-    return (T) KRYO_SERIALIZER.get().deserialize(objectData);
-  }
-
-  public static <T> SimpleSerializer<T> createJavaSimpleSerializer() {
+  public static <R> ResourceSerde<R> createJavaSimpleSerializer() {
     return JavaSerializer.INSTANT;
   }
 
-  private static class KryoSerializerInstance implements Serializable {
-    public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
-    private final Kryo kryo;
-    private final ByteArrayOutputStream outputStream;
-
-    KryoSerializerInstance() {
-      KryoInstantiator kryoInstantiator = new KryoInstantiator();
-      kryo = kryoInstantiator.newKryo();
-      outputStream = new 
ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
-      kryo.setRegistrationRequired(false);
-    }
-
-    byte[] serialize(Object obj) {
-      kryo.reset();
-      outputStream.reset();
-      Output output = new Output(outputStream);
-      this.kryo.writeClassAndObject(output, obj);
-      output.close();
-      return outputStream.toByteArray();
-    }
-
-    Object deserialize(byte[] objectData) {
-      return this.kryo.readClassAndObject(new Input(objectData));
-    }
-  }
-
-  private static class KryoInstantiator implements Serializable {
-
-    public Kryo newKryo() {
-      Kryo kryo = new Kryo();
-
-      // This instance of Kryo should not require prior registration of classes
-      kryo.setRegistrationRequired(false);
-      Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
-          new Kryo.DefaultInstantiatorStrategy();
-      instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-      kryo.setInstantiatorStrategy(instantiatorStrategy);
-      // Handle cases where we may have an odd classloader setup like with lib 
jars
-      // for hadoop
-      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-
-      // Register serializers
-      kryo.register(Utf8.class, new AvroUtf8Serializer());
-
-      return kryo;
-    }
-  }
-
-  private static class AvroUtf8Serializer extends Serializer<Utf8> {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void write(Kryo kryo, Output output, Utf8 utf8String) {
-      Serializer<byte[]> bytesSerializer = 
kryo.getDefaultSerializer(byte[].class);
-      bytesSerializer.write(kryo, output, utf8String.getBytes());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
-      Serializer<byte[]> bytesSerializer = 
kryo.getDefaultSerializer(byte[].class);
-      byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
-      return new Utf8(bytes);
-    }
-  }
-
-  public interface SimpleSerializer<T> {
-
-    byte[] serialize(T t);
-
-    T deserialize(byte[] bytes);
-  }
-
-  public static class JavaSerializer<T extends Serializable> implements 
SimpleSerializer<T> {
-
-    public static final JavaSerializer INSTANT = new JavaSerializer<>();
-
-    @Override
-    public byte[] serialize(T t) {
-      checkNotNull(t);
-      return SerializationUtil.kryoSerialize(t);
-    }
-
-    @Override
-    public T deserialize(byte[] bytes) {
-      if (bytes == null) {
-        return null;
-      }
-      return SerializationUtil.kryoDeserialize(bytes);
+  @SuppressWarnings("unchecked")
+  public static <R> R kryoDeserialize(final byte[] objectData) {
+    if (objectData == null) {
+      throw new NullPointerException("The byte[] must not be null");
     }
+    return (R) kryoSerialize.deserializeResource(objectData);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
index 7670730fa..977df3ca6 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
@@ -18,8 +18,9 @@
 
 package org.apache.amoro.utils.map;
 
+import org.apache.amoro.serialization.JavaSerializer;
+import org.apache.amoro.serialization.ResourceSerde;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.utils.SerializationUtil;
 
 import javax.annotation.Nullable;
 
@@ -42,9 +43,9 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K, 
T> {
   private long estimatedPayloadSize = 0;
   private int putCount = 0;
 
-  private final SerializationUtil.SimpleSerializer<K> keySerializer;
+  private final ResourceSerde<K> keySerializer;
 
-  private final SerializationUtil.SimpleSerializer<T> valueSerializer;
+  private final ResourceSerde<T> valueSerializer;
 
   protected SimpleSpillableMap(
       Long maxInMemorySizeInBytes,
@@ -54,8 +55,8 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K, 
T> {
     this(
         maxInMemorySizeInBytes,
         backendBaseDir,
-        SerializationUtil.JavaSerializer.INSTANT,
-        SerializationUtil.JavaSerializer.INSTANT,
+        JavaSerializer.INSTANT,
+        JavaSerializer.INSTANT,
         keySizeEstimator,
         valueSizeEstimator);
   }
@@ -63,8 +64,8 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K, 
T> {
   protected SimpleSpillableMap(
       Long maxInMemorySizeInBytes,
       @Nullable String backendBaseDir,
-      SerializationUtil.SimpleSerializer<K> keySerializer,
-      SerializationUtil.SimpleSerializer<T> valueSerializer,
+      ResourceSerde<K> keySerializer,
+      ResourceSerde<T> valueSerializer,
       SizeEstimator<K> keySizeEstimator,
       SizeEstimator<T> valueSizeEstimator) {
     this.memoryMap = Maps.newHashMap();
@@ -153,13 +154,13 @@ public class SimpleSpillableMap<K, T> implements 
SimpleMap<K, T> {
 
     private final String columnFamily = UUID.randomUUID().toString();
 
-    private final SerializationUtil.SimpleSerializer<K> keySerializer;
+    private final ResourceSerde<K> keySerializer;
 
-    private final SerializationUtil.SimpleSerializer<T> valueSerializer;
+    private final ResourceSerde<T> valueSerializer;
 
     public SimpleSpilledMap(
-        SerializationUtil.SimpleSerializer<K> keySerializer,
-        SerializationUtil.SimpleSerializer<T> valueSerializer,
+        ResourceSerde<K> keySerializer,
+        ResourceSerde<T> valueSerializer,
         @Nullable String backendBaseDir) {
       rocksDB = RocksDBBackend.getOrCreateInstance(backendBaseDir);
       rocksDB.addColumnFamily(columnFamily);
@@ -168,19 +169,23 @@ public class SimpleSpillableMap<K, T> implements 
SimpleMap<K, T> {
     }
 
     public boolean containsKey(K key) {
-      return rocksDB.get(columnFamily, keySerializer.serialize(key)) != null;
+      return rocksDB.get(columnFamily, keySerializer.serializeResource(key)) 
!= null;
     }
 
     public T get(K key) {
-      return valueSerializer.deserialize(rocksDB.get(columnFamily, 
keySerializer.serialize(key)));
+      return valueSerializer.deserializeResource(
+          rocksDB.get(columnFamily, keySerializer.serializeResource(key)));
     }
 
     public void put(K key, T value) {
-      rocksDB.put(columnFamily, keySerializer.serialize(key), 
valueSerializer.serialize(value));
+      rocksDB.put(
+          columnFamily,
+          keySerializer.serializeResource(key),
+          valueSerializer.serializeResource(value));
     }
 
     public void delete(K key) {
-      rocksDB.delete(columnFamily, keySerializer.serialize(key));
+      rocksDB.delete(columnFamily, keySerializer.serializeResource(key));
     }
 
     public void close() {
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
index 4fa1a6347..d05d0ecd4 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
@@ -20,13 +20,13 @@ package org.apache.amoro.utils.map;
 
 import static 
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.amoro.serialization.ResourceSerde;
 import org.apache.amoro.utils.SerializationUtil;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeWrapper;
 
-public class StructLikeWrapperSerializer
-    implements SerializationUtil.SimpleSerializer<StructLikeWrapper> {
+public class StructLikeWrapperSerializer implements 
ResourceSerde<StructLikeWrapper> {
 
   protected final StructLikeWrapper structLikeWrapper;
 
@@ -39,14 +39,14 @@ public class StructLikeWrapperSerializer
   }
 
   @Override
-  public byte[] serialize(StructLikeWrapper structLikeWrapper) {
+  public byte[] serializeResource(StructLikeWrapper structLikeWrapper) {
     checkNotNull(structLikeWrapper);
     StructLike copy = StructLikeCopy.copy(structLikeWrapper.get());
     return SerializationUtil.kryoSerialize(copy);
   }
 
   @Override
-  public StructLikeWrapper deserialize(byte[] bytes) {
+  public StructLikeWrapper deserializeResource(byte[] bytes) {
     if (bytes == null) {
       return null;
     }

Reply via email to