This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 7ff304cf8 [AMORO-3356] Include ResourceSerde interface and impl with
Simple/Kryo/Java Serialize (#3571)
7ff304cf8 is described below
commit 7ff304cf8cda3ec95954ec9ea97328e5ce541e0f
Author: ConradJam <[email protected]>
AuthorDate: Thu May 29 22:18:05 2025 +0800
[AMORO-3356] Include ResourceSerde interface and impl with Simple/Kryo/Java
Serialize (#3571)
* [AMORO-3356] Include ResourceSerde interface and impl with
SimpleSerialize and KryoSerialize
* fix up
* fix up
---------
Co-authored-by: Xu Bai <[email protected]>
---
.../apache/amoro/serialization/JavaSerializer.java | 52 +++++++
.../KryoSerializer.java} | 91 ++-----------
.../apache/amoro/serialization/ResourceSerde.java | 42 ++++++
.../amoro/serialization/SimpleSerializer.java | 56 ++++++++
.../org/apache/amoro/utils/SerializationUtil.java | 151 +++------------------
.../apache/amoro/utils/map/SimpleSpillableMap.java | 35 +++--
.../utils/map/StructLikeWrapperSerializer.java | 8 +-
7 files changed, 205 insertions(+), 230 deletions(-)
diff --git
a/amoro-common/src/main/java/org/apache/amoro/serialization/JavaSerializer.java
b/amoro-common/src/main/java/org/apache/amoro/serialization/JavaSerializer.java
new file mode 100644
index 000000000..7496ab4fb
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/serialization/JavaSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.serialization;
+
+import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+/**
+ * JavaSerializer Use KryoSerialize to serialize/deserialize java objects
+ *
+ * @param <R>
+ */
+@SuppressWarnings("rawtypes")
+public class JavaSerializer<R extends Serializable> implements
ResourceSerde<R> {
+
+ private static final KryoSerializer kryoSerialize = new KryoSerializer<>();
+
+ public static final JavaSerializer INSTANT = new JavaSerializer<>();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public byte[] serializeResource(R resource) {
+ checkNotNull(resource);
+ return kryoSerialize.serializeResource(resource);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public R deserializeResource(byte[] input) {
+ if (input == null) {
+ return null;
+ }
+ return (R) kryoSerialize.deserializeResource(input);
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
b/amoro-common/src/main/java/org/apache/amoro/serialization/KryoSerializer.java
similarity index 59%
copy from
amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
copy to
amoro-common/src/main/java/org/apache/amoro/serialization/KryoSerializer.java
index 930e25acc..cc80db303 100644
--- a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
+++
b/amoro-common/src/main/java/org/apache/amoro/serialization/KryoSerializer.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.utils;
-
-import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+package org.apache.amoro.serialization;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
@@ -27,59 +25,26 @@ import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.nio.ByteBuffer;
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class SerializationUtil {
+public class KryoSerializer<R> implements ResourceSerde<R> {
private static final ThreadLocal<KryoSerializerInstance> KRYO_SERIALIZER =
ThreadLocal.withInitial(KryoSerializerInstance::new);
- public static ByteBuffer simpleSerialize(Object obj) {
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(obj);
- oos.flush();
- return ByteBuffer.wrap(bos.toByteArray());
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("serialization error of " + obj, e);
- }
- }
-
- public static <T> T simpleDeserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
- try (ObjectInputStream ois = new ObjectInputStream(bis)) {
- return (T) ois.readObject();
- }
- } catch (IOException | ClassNotFoundException e) {
- throw new IllegalArgumentException("deserialization error ", e);
- }
- }
-
- public static byte[] kryoSerialize(final Object obj) {
- return KRYO_SERIALIZER.get().serialize(obj);
+ @Override
+ public byte[] serializeResource(R resource) {
+ return KRYO_SERIALIZER.get().serialize(resource);
}
+ @Override
@SuppressWarnings("unchecked")
- public static <T> T kryoDeserialize(final byte[] objectData) {
- if (objectData == null) {
- throw new NullPointerException("The byte[] must not be null");
+ public R deserializeResource(byte[] input) {
+ if (input == null) {
+ throw new NullPointerException("The bytes[] Input must not be null");
}
- return (T) KRYO_SERIALIZER.get().deserialize(objectData);
- }
-
- public static <T> SimpleSerializer<T> createJavaSimpleSerializer() {
- return JavaSerializer.INSTANT;
+ return (R) KRYO_SERIALIZER.get().deserialize(input);
}
private static class KryoSerializerInstance implements Serializable {
@@ -88,8 +53,8 @@ public class SerializationUtil {
private final ByteArrayOutputStream outputStream;
KryoSerializerInstance() {
- KryoInstantiator kryoInstantiator = new KryoInstantiator();
- kryo = kryoInstantiator.newKryo();
+ KryoInstantiation kryoInstantiation = new KryoInstantiation();
+ kryo = kryoInstantiation.newKryo();
outputStream = new
ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
kryo.setRegistrationRequired(false);
}
@@ -108,7 +73,7 @@ public class SerializationUtil {
}
}
- private static class KryoInstantiator implements Serializable {
+ private static class KryoInstantiation implements Serializable {
public Kryo newKryo() {
Kryo kryo = new Kryo();
@@ -132,45 +97,19 @@ public class SerializationUtil {
private static class AvroUtf8Serializer extends Serializer<Utf8> {
- @SuppressWarnings("unchecked")
@Override
+ @SuppressWarnings("unchecked")
public void write(Kryo kryo, Output output, Utf8 utf8String) {
Serializer<byte[]> bytesSerializer =
kryo.getDefaultSerializer(byte[].class);
bytesSerializer.write(kryo, output, utf8String.getBytes());
}
- @SuppressWarnings("unchecked")
@Override
+ @SuppressWarnings("unchecked")
public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
Serializer<byte[]> bytesSerializer =
kryo.getDefaultSerializer(byte[].class);
byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
return new Utf8(bytes);
}
}
-
- public interface SimpleSerializer<T> {
-
- byte[] serialize(T t);
-
- T deserialize(byte[] bytes);
- }
-
- public static class JavaSerializer<T extends Serializable> implements
SimpleSerializer<T> {
-
- public static final JavaSerializer INSTANT = new JavaSerializer<>();
-
- @Override
- public byte[] serialize(T t) {
- checkNotNull(t);
- return SerializationUtil.kryoSerialize(t);
- }
-
- @Override
- public T deserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- return SerializationUtil.kryoDeserialize(bytes);
- }
- }
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/serialization/ResourceSerde.java
b/amoro-common/src/main/java/org/apache/amoro/serialization/ResourceSerde.java
new file mode 100644
index 000000000..7c780675b
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/serialization/ResourceSerde.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.serialization;
+
+/**
+ * ResourceSerde interface
+ *
+ * @param <R> resource serde with obj
+ */
+public interface ResourceSerde<R> {
+
+ /**
+ * serialize resource
+ *
+ * @param resource input object
+ */
+ byte[] serializeResource(R resource);
+
+ /**
+ * deserialize resource
+ *
+ * @param input bytes
+ * @return output deserialize obj
+ */
+ R deserializeResource(byte[] input);
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/serialization/SimpleSerializer.java
b/amoro-common/src/main/java/org/apache/amoro/serialization/SimpleSerializer.java
new file mode 100644
index 000000000..4ebfc56a2
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/serialization/SimpleSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class SimpleSerializer<R> implements ResourceSerde<R> {
+
+ @Override
+ public byte[] serializeResource(R resource) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(resource);
+ oos.flush();
+ return bos.toByteArray();
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("serialization error of " + resource,
e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public R deserializeResource(byte[] input) {
+ if (input == null) {
+ return null;
+ }
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(input)) {
+ try (ObjectInputStream ois = new ObjectInputStream(bis)) {
+ return (R) ois.readObject();
+ }
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IllegalArgumentException("deserialization error ", e);
+ }
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
b/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
index 930e25acc..5da2ca4bb 100644
--- a/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
+++ b/amoro-common/src/main/java/org/apache/amoro/utils/SerializationUtil.java
@@ -18,159 +18,40 @@
package org.apache.amoro.utils;
-import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.amoro.serialization.JavaSerializer;
+import org.apache.amoro.serialization.KryoSerializer;
+import org.apache.amoro.serialization.ResourceSerde;
+import org.apache.amoro.serialization.SimpleSerializer;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
import java.nio.ByteBuffer;
@SuppressWarnings({"unchecked", "rawtypes"})
public class SerializationUtil {
- private static final ThreadLocal<KryoSerializerInstance> KRYO_SERIALIZER =
- ThreadLocal.withInitial(KryoSerializerInstance::new);
+ private static final KryoSerializer kryoSerialize = new KryoSerializer();
+ private static final SimpleSerializer simpleSerialize = new
SimpleSerializer();
public static ByteBuffer simpleSerialize(Object obj) {
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(obj);
- oos.flush();
- return ByteBuffer.wrap(bos.toByteArray());
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("serialization error of " + obj, e);
- }
+ return ByteBuffer.wrap(simpleSerialize.serializeResource(obj));
}
- public static <T> T simpleDeserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
- try (ObjectInputStream ois = new ObjectInputStream(bis)) {
- return (T) ois.readObject();
- }
- } catch (IOException | ClassNotFoundException e) {
- throw new IllegalArgumentException("deserialization error ", e);
- }
+ public static <R> R simpleDeserialize(byte[] bytes) {
+ return (R) simpleSerialize.deserializeResource(bytes);
}
public static byte[] kryoSerialize(final Object obj) {
- return KRYO_SERIALIZER.get().serialize(obj);
+ return kryoSerialize.serializeResource(obj);
}
- @SuppressWarnings("unchecked")
- public static <T> T kryoDeserialize(final byte[] objectData) {
- if (objectData == null) {
- throw new NullPointerException("The byte[] must not be null");
- }
- return (T) KRYO_SERIALIZER.get().deserialize(objectData);
- }
-
- public static <T> SimpleSerializer<T> createJavaSimpleSerializer() {
+ public static <R> ResourceSerde<R> createJavaSimpleSerializer() {
return JavaSerializer.INSTANT;
}
- private static class KryoSerializerInstance implements Serializable {
- public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
- private final Kryo kryo;
- private final ByteArrayOutputStream outputStream;
-
- KryoSerializerInstance() {
- KryoInstantiator kryoInstantiator = new KryoInstantiator();
- kryo = kryoInstantiator.newKryo();
- outputStream = new
ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
- kryo.setRegistrationRequired(false);
- }
-
- byte[] serialize(Object obj) {
- kryo.reset();
- outputStream.reset();
- Output output = new Output(outputStream);
- this.kryo.writeClassAndObject(output, obj);
- output.close();
- return outputStream.toByteArray();
- }
-
- Object deserialize(byte[] objectData) {
- return this.kryo.readClassAndObject(new Input(objectData));
- }
- }
-
- private static class KryoInstantiator implements Serializable {
-
- public Kryo newKryo() {
- Kryo kryo = new Kryo();
-
- // This instance of Kryo should not require prior registration of classes
- kryo.setRegistrationRequired(false);
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
- new Kryo.DefaultInstantiatorStrategy();
- instantiatorStrategy.setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
- // Handle cases where we may have an odd classloader setup like with lib
jars
- // for hadoop
- kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-
- // Register serializers
- kryo.register(Utf8.class, new AvroUtf8Serializer());
-
- return kryo;
- }
- }
-
- private static class AvroUtf8Serializer extends Serializer<Utf8> {
-
- @SuppressWarnings("unchecked")
- @Override
- public void write(Kryo kryo, Output output, Utf8 utf8String) {
- Serializer<byte[]> bytesSerializer =
kryo.getDefaultSerializer(byte[].class);
- bytesSerializer.write(kryo, output, utf8String.getBytes());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
- Serializer<byte[]> bytesSerializer =
kryo.getDefaultSerializer(byte[].class);
- byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
- return new Utf8(bytes);
- }
- }
-
- public interface SimpleSerializer<T> {
-
- byte[] serialize(T t);
-
- T deserialize(byte[] bytes);
- }
-
- public static class JavaSerializer<T extends Serializable> implements
SimpleSerializer<T> {
-
- public static final JavaSerializer INSTANT = new JavaSerializer<>();
-
- @Override
- public byte[] serialize(T t) {
- checkNotNull(t);
- return SerializationUtil.kryoSerialize(t);
- }
-
- @Override
- public T deserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- return SerializationUtil.kryoDeserialize(bytes);
+ @SuppressWarnings("unchecked")
+ public static <R> R kryoDeserialize(final byte[] objectData) {
+ if (objectData == null) {
+ throw new NullPointerException("The byte[] must not be null");
}
+ return (R) kryoSerialize.deserializeResource(objectData);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
index 7670730fa..977df3ca6 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleSpillableMap.java
@@ -18,8 +18,9 @@
package org.apache.amoro.utils.map;
+import org.apache.amoro.serialization.JavaSerializer;
+import org.apache.amoro.serialization.ResourceSerde;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.utils.SerializationUtil;
import javax.annotation.Nullable;
@@ -42,9 +43,9 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K,
T> {
private long estimatedPayloadSize = 0;
private int putCount = 0;
- private final SerializationUtil.SimpleSerializer<K> keySerializer;
+ private final ResourceSerde<K> keySerializer;
- private final SerializationUtil.SimpleSerializer<T> valueSerializer;
+ private final ResourceSerde<T> valueSerializer;
protected SimpleSpillableMap(
Long maxInMemorySizeInBytes,
@@ -54,8 +55,8 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K,
T> {
this(
maxInMemorySizeInBytes,
backendBaseDir,
- SerializationUtil.JavaSerializer.INSTANT,
- SerializationUtil.JavaSerializer.INSTANT,
+ JavaSerializer.INSTANT,
+ JavaSerializer.INSTANT,
keySizeEstimator,
valueSizeEstimator);
}
@@ -63,8 +64,8 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K,
T> {
protected SimpleSpillableMap(
Long maxInMemorySizeInBytes,
@Nullable String backendBaseDir,
- SerializationUtil.SimpleSerializer<K> keySerializer,
- SerializationUtil.SimpleSerializer<T> valueSerializer,
+ ResourceSerde<K> keySerializer,
+ ResourceSerde<T> valueSerializer,
SizeEstimator<K> keySizeEstimator,
SizeEstimator<T> valueSizeEstimator) {
this.memoryMap = Maps.newHashMap();
@@ -153,13 +154,13 @@ public class SimpleSpillableMap<K, T> implements
SimpleMap<K, T> {
private final String columnFamily = UUID.randomUUID().toString();
- private final SerializationUtil.SimpleSerializer<K> keySerializer;
+ private final ResourceSerde<K> keySerializer;
- private final SerializationUtil.SimpleSerializer<T> valueSerializer;
+ private final ResourceSerde<T> valueSerializer;
public SimpleSpilledMap(
- SerializationUtil.SimpleSerializer<K> keySerializer,
- SerializationUtil.SimpleSerializer<T> valueSerializer,
+ ResourceSerde<K> keySerializer,
+ ResourceSerde<T> valueSerializer,
@Nullable String backendBaseDir) {
rocksDB = RocksDBBackend.getOrCreateInstance(backendBaseDir);
rocksDB.addColumnFamily(columnFamily);
@@ -168,19 +169,23 @@ public class SimpleSpillableMap<K, T> implements
SimpleMap<K, T> {
}
public boolean containsKey(K key) {
- return rocksDB.get(columnFamily, keySerializer.serialize(key)) != null;
+ return rocksDB.get(columnFamily, keySerializer.serializeResource(key))
!= null;
}
public T get(K key) {
- return valueSerializer.deserialize(rocksDB.get(columnFamily,
keySerializer.serialize(key)));
+ return valueSerializer.deserializeResource(
+ rocksDB.get(columnFamily, keySerializer.serializeResource(key)));
}
public void put(K key, T value) {
- rocksDB.put(columnFamily, keySerializer.serialize(key),
valueSerializer.serialize(value));
+ rocksDB.put(
+ columnFamily,
+ keySerializer.serializeResource(key),
+ valueSerializer.serializeResource(value));
}
public void delete(K key) {
- rocksDB.delete(columnFamily, keySerializer.serialize(key));
+ rocksDB.delete(columnFamily, keySerializer.serializeResource(key));
}
public void close() {
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
index 4fa1a6347..d05d0ecd4 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeWrapperSerializer.java
@@ -20,13 +20,13 @@ package org.apache.amoro.utils.map;
import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.amoro.serialization.ResourceSerde;
import org.apache.amoro.utils.SerializationUtil;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;
-public class StructLikeWrapperSerializer
- implements SerializationUtil.SimpleSerializer<StructLikeWrapper> {
+public class StructLikeWrapperSerializer implements
ResourceSerde<StructLikeWrapper> {
protected final StructLikeWrapper structLikeWrapper;
@@ -39,14 +39,14 @@ public class StructLikeWrapperSerializer
}
@Override
- public byte[] serialize(StructLikeWrapper structLikeWrapper) {
+ public byte[] serializeResource(StructLikeWrapper structLikeWrapper) {
checkNotNull(structLikeWrapper);
StructLike copy = StructLikeCopy.copy(structLikeWrapper.get());
return SerializationUtil.kryoSerialize(copy);
}
@Override
- public StructLikeWrapper deserialize(byte[] bytes) {
+ public StructLikeWrapper deserializeResource(byte[] bytes) {
if (bytes == null) {
return null;
}