This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fury.git


The following commit(s) were added to refs/heads/main by this push:
     new 64c995db feat(java): fast object copy framework in fury java (#1701)
64c995db is described below

commit 64c995db370a190c0743d041be8f1e6871b59f65
Author: zhaommmmomo <[email protected]>
AuthorDate: Fri Jul 19 00:56:09 2024 +0800

    feat(java): fast object copy framework in fury java (#1701)
    
    ## What does this PR do?
    
    Object deep copy framework in fury java. This PR is only for non-jit
    serializer
    
    - For immutable object such as `String`、`java.time`、`int`、`byte`...
    return itself.
    - For mutable object, create new object and set all attributes.
    - For `pojo` / `bean` object, Use `ObjectSerializer.copy(obj)` to create
    a new object
    - For `Arrays`、`Collection`、`Map` object, create new object, traverse
    the elements, call `fory.copy(obj)` to generate new elements and set
    them to the new object. Some serializers have special `copy` methods.
    
    
    
    ## Related issues
    
    [[Java] fast object copy framework in fury java
    #1679](https://github.com/apache/fury/issues/1679)
    
    
    
    ## Does this PR introduce any user-facing change?
    
    - [x] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    
    
    1. Provide `fury.copy(obj)` interface, which can deep copy the object
    without serialize / deserialize. For example:
    
    ```java
    User user = new User();
    user.setName("a");
    
    // If object contain circular references, please enable copy ref tracking 
by FuryBuilder#withCopyRefTracking(true)
    Fury fury = 
Fury.builder().withLanguage(Language.JAVA).withCopyRefTracking(true).build();
    User newUser = fury.copy(user);
    ```
    
    
    
    2. Provide `FuryCopyable<T>` interface, which can customize the copy
    method of object. For example:
    
    ```java
    public class User implements Serializable, FuryCopyable<User> {
    
      @Override
      public User copy(Fury fury) {
        // do something
        System.out.println("object custom copy method");
        return newUser;
      }
    }
    ```
    
    
    
    
    
    ## Benchmark
    > Device
    
    windows 10、12 cores、24G
    
    > JDK
    
    java version "1.8.0_202"
    Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
    Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)
    
    java version "17.0.8" 2023-07-18 LTS
    Java(TM) SE Runtime Environment (build 17.0.8+9-LTS-211)
    Java HotSpot(TM) 64-Bit Server VM (build 17.0.8+9-LTS-211, mixed mode,
    sharing)
    
    > Test data
    
    see [benchmark
    
code](https://github.com/zhaommmmomo/fury/tree/copy_benchmark/java/fury-benchmark/src/main/java/org/apache/fury/benchmark)
    ```java
    int size = 128;
    int[] intArr = new int[size];
    List<Object> list = new ArrayList<>(size);
    Map<Object, Object> map = new ConcurrentHashMap<>();
    BeanA beanA = BeanA.createBeanA(size); // org.apache.fury.test.bean.BeanA
    
    for (int i = 0; i < size; i++) {
      intArr[i] = i;
      list.add(i);
      map.put(i, UUID.randomUUID().toString());
    }
    ```
    
    > Test
    
    **JMH**:
    benchmarkMode({Mode.Throughput})、fork(1)、threads(1)、warmup(iterations =
    3, time = 1)、measurement(iterations = 5, time =
    5)、outputTimeUnit(TimeUnit.SECONDS)
    *JDK8*
    
    
    *JDK17*
    
    
    **JMH**:
    benchmarkMode({Mode.AverageTime})、fork(1)、threads(1)、warmup(iterations =
    3, time = 1)、measurement(iterations = 5, time =
    5)、outputTimeUnit(TimeUnit.MILLISECONDS)
    **Each benchmark method will be looped 10,000 times**
    *JDK8*
    
    
    *JDK17*
---
 .../src/main/java/org/apache/fury/BaseFury.java    |   3 +
 .../src/main/java/org/apache/fury/Fury.java        | 148 +++++++++
 ...atibleSerializerBase.java => FuryCopyable.java} |  21 +-
 .../main/java/org/apache/fury/ThreadLocalFury.java |   5 +
 .../java/org/apache/fury/builder/Generated.java    |   3 +-
 .../main/java/org/apache/fury/config/Config.java   |   8 +
 .../java/org/apache/fury/config/FuryBuilder.java   |   7 +
 .../java/org/apache/fury/pool/ThreadPoolFury.java  |   5 +
 .../org/apache/fury/resolver/ClassResolver.java    |   8 +-
 .../fury/serializer/AbstractObjectSerializer.java  | 146 +++++++++
 .../apache/fury/serializer/ArraySerializers.java   |  58 ++++
 .../apache/fury/serializer/CodegenSerializer.java  |   2 +-
 .../fury/serializer/CompatibleSerializer.java      |  29 +-
 .../fury/serializer/CompatibleSerializerBase.java  |   7 +-
 .../org/apache/fury/serializer/EnumSerializer.java |   2 +-
 ...Serializer.java => FuryCopyableSerializer.java} |  75 ++---
 ...erializerBase.java => ImmutableSerializer.java} |  22 +-
 .../apache/fury/serializer/LocaleSerializer.java   |   2 +-
 .../fury/serializer/MetaSharedSerializer.java      |  31 +-
 .../apache/fury/serializer/ObjectSerializer.java   |  27 +-
 .../fury/serializer/OptionalSerializers.java       |  14 +-
 .../fury/serializer/PrimitiveSerializers.java      |  23 +-
 .../org/apache/fury/serializer/Serializer.java     |  43 ++-
 .../org/apache/fury/serializer/Serializers.java    |  54 +++-
 .../apache/fury/serializer/StringSerializer.java   |   2 +-
 .../apache/fury/serializer/TimeSerializers.java    |   2 +-
 .../org/apache/fury/serializer/URLSerializer.java  |   2 +-
 .../collection/AbstractCollectionSerializer.java   |  39 ++-
 .../collection/AbstractMapSerializer.java          |  52 ++-
 .../collection/CollectionSerializer.java           |  28 ++
 .../collection/CollectionSerializers.java          |  96 +++++-
 .../collection/GuavaCollectionSerializers.java     |  21 ++
 .../collection/ImmutableCollectionSerializers.java |  15 +
 .../fury/serializer/collection/MapSerializer.java  |  26 ++
 .../fury/serializer/collection/MapSerializers.java |  86 ++++-
 .../collection/SynchronizedSerializers.java        |   6 +
 .../collection/UnmodifiableSerializers.java        |   6 +
 .../serializer/scala/SingletonMapSerializer.java   |   5 +
 .../test/java/org/apache/fury/FuryCopyTest.java    | 347 +++++++++++++++++++++
 .../fury/serializer/scala/MapSerializer.scala      |   4 +
 40 files changed, 1296 insertions(+), 184 deletions(-)

diff --git a/java/fury-core/src/main/java/org/apache/fury/BaseFury.java 
b/java/fury-core/src/main/java/org/apache/fury/BaseFury.java
index 1ee1aec8..96a04be0 100644
--- a/java/fury-core/src/main/java/org/apache/fury/BaseFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/BaseFury.java
@@ -185,4 +185,7 @@ public interface BaseFury {
   /** This method is deprecated, please use {@link #deserialize} instead. */
   @Deprecated
   Object deserializeJavaObjectAndClass(FuryReadableChannel channel);
+
+  /** Deep copy the <code>obj</code>. */
+  <T> T copy(T obj);
 }
diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java 
b/java/fury-core/src/main/java/org/apache/fury/Fury.java
index ccef2aaa..4cb45b21 100644
--- a/java/fury-core/src/main/java/org/apache/fury/Fury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java
@@ -24,12 +24,15 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.fury.builder.JITContext;
+import org.apache.fury.collection.IdentityMap;
 import org.apache.fury.config.CompatibleMode;
 import org.apache.fury.config.Config;
 import org.apache.fury.config.FuryBuilder;
@@ -57,6 +60,8 @@ import 
org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
 import org.apache.fury.serializer.Serializer;
 import org.apache.fury.serializer.SerializerFactory;
 import org.apache.fury.serializer.StringSerializer;
+import 
org.apache.fury.serializer.collection.CollectionSerializers.ArrayListSerializer;
+import org.apache.fury.serializer.collection.MapSerializers.HashMapSerializer;
 import org.apache.fury.type.Generics;
 import org.apache.fury.type.Type;
 import org.apache.fury.util.ExceptionUtils;
@@ -106,6 +111,8 @@ public final class Fury implements BaseFury {
   private MemoryBuffer buffer;
   private final List<Object> nativeObjects;
   private final StringSerializer stringSerializer;
+  private final ArrayListSerializer arrayListSerializer;
+  private final HashMapSerializer hashMapSerializer;
   private final Language language;
   private final boolean compressInt;
   private final LongEncoding longEncoding;
@@ -115,6 +122,9 @@ public final class Fury implements BaseFury {
   private Iterator<MemoryBuffer> outOfBandBuffers;
   private boolean peerOutOfBandEnabled;
   private int depth;
+  private int copyDepth;
+  private final boolean copyRefTracking;
+  private final IdentityMap<Object, Object> originToCopyMap;
 
   public Fury(FuryBuilder builder, ClassLoader classLoader) {
     // Avoid set classLoader in `FuryBuilder`, which won't be clear when
@@ -122,6 +132,7 @@ public final class Fury implements BaseFury {
     config = new Config(builder);
     this.language = config.getLanguage();
     this.refTracking = config.trackingRef();
+    this.copyRefTracking = config.copyTrackingRef();
     compressInt = config.compressInt();
     longEncoding = config.longEncoding();
     if (refTracking) {
@@ -138,6 +149,9 @@ public final class Fury implements BaseFury {
     nativeObjects = new ArrayList<>();
     generics = new Generics(this);
     stringSerializer = new StringSerializer(this);
+    arrayListSerializer = new ArrayListSerializer(this);
+    hashMapSerializer = new HashMapSerializer(this);
+    originToCopyMap = new IdentityMap<>();
     LOG.info("Created new fury {}", this);
   }
 
@@ -288,6 +302,19 @@ public final class Fury implements BaseFury {
     throw e;
   }
 
+  private StackOverflowError processCopyStackOverflowError(StackOverflowError 
e) {
+    if (!copyRefTracking) {
+      String msg =
+          "Object may contain circular references, please enable ref tracking "
+              + "by `FuryBuilder#withCopyRefTracking(true)`";
+      StackOverflowError t1 = 
ExceptionUtils.trySetStackOverflowErrorMessage(e, msg);
+      if (t1 != null) {
+        return t1;
+      }
+    }
+    throw e;
+  }
+
   public MemoryBuffer getBuffer() {
     MemoryBuffer buf = buffer;
     if (buf == null) {
@@ -1220,6 +1247,113 @@ public final class Fury implements BaseFury {
     return deserializeJavaObjectAndClass(buf);
   }
 
+  @Override
+  public <T> T copy(T obj) {
+    try {
+      return copyObject(obj);
+    } catch (StackOverflowError e) {
+      throw processCopyStackOverflowError(e);
+    } finally {
+      if (copyRefTracking) {
+        resetCopy();
+      }
+    }
+  }
+
+  /**
+   * Copy object. This method provides a fast copy of common types.
+   *
+   * @param obj object to copy
+   * @return copied object
+   */
+  public <T> T copyObject(T obj) {
+    if (obj == null) {
+      return null;
+    }
+    Object copy;
+    ClassInfo classInfo = classResolver.getOrUpdateClassInfo(obj.getClass());
+    switch (classInfo.getClassId()) {
+      case ClassResolver.BOOLEAN_CLASS_ID:
+      case ClassResolver.BYTE_CLASS_ID:
+      case ClassResolver.CHAR_CLASS_ID:
+      case ClassResolver.SHORT_CLASS_ID:
+      case ClassResolver.INTEGER_CLASS_ID:
+      case ClassResolver.FLOAT_CLASS_ID:
+      case ClassResolver.LONG_CLASS_ID:
+      case ClassResolver.DOUBLE_CLASS_ID:
+      case ClassResolver.PRIMITIVE_BOOLEAN_CLASS_ID:
+      case ClassResolver.PRIMITIVE_BYTE_CLASS_ID:
+      case ClassResolver.PRIMITIVE_CHAR_CLASS_ID:
+      case ClassResolver.PRIMITIVE_SHORT_CLASS_ID:
+      case ClassResolver.PRIMITIVE_INT_CLASS_ID:
+      case ClassResolver.PRIMITIVE_FLOAT_CLASS_ID:
+      case ClassResolver.PRIMITIVE_LONG_CLASS_ID:
+      case ClassResolver.PRIMITIVE_DOUBLE_CLASS_ID:
+      case ClassResolver.STRING_CLASS_ID:
+        return obj;
+      case ClassResolver.PRIMITIVE_BOOLEAN_ARRAY_CLASS_ID:
+        boolean[] boolArr = (boolean[]) obj;
+        return (T) Arrays.copyOf(boolArr, boolArr.length);
+      case ClassResolver.PRIMITIVE_BYTE_ARRAY_CLASS_ID:
+        byte[] byteArr = (byte[]) obj;
+        return (T) Arrays.copyOf(byteArr, byteArr.length);
+      case ClassResolver.PRIMITIVE_CHAR_ARRAY_CLASS_ID:
+        char[] charArr = (char[]) obj;
+        return (T) Arrays.copyOf(charArr, charArr.length);
+      case ClassResolver.PRIMITIVE_SHORT_ARRAY_CLASS_ID:
+        short[] shortArr = (short[]) obj;
+        return (T) Arrays.copyOf(shortArr, shortArr.length);
+      case ClassResolver.PRIMITIVE_INT_ARRAY_CLASS_ID:
+        int[] intArr = (int[]) obj;
+        return (T) Arrays.copyOf(intArr, intArr.length);
+      case ClassResolver.PRIMITIVE_FLOAT_ARRAY_CLASS_ID:
+        float[] floatArr = (float[]) obj;
+        return (T) Arrays.copyOf(floatArr, floatArr.length);
+      case ClassResolver.PRIMITIVE_LONG_ARRAY_CLASS_ID:
+        long[] longArr = (long[]) obj;
+        return (T) Arrays.copyOf(longArr, longArr.length);
+      case ClassResolver.PRIMITIVE_DOUBLE_ARRAY_CLASS_ID:
+        double[] doubleArr = (double[]) obj;
+        return (T) Arrays.copyOf(doubleArr, doubleArr.length);
+      case ClassResolver.STRING_ARRAY_CLASS_ID:
+        String[] stringArr = (String[]) obj;
+        return (T) Arrays.copyOf(stringArr, stringArr.length);
+      case ClassResolver.ARRAYLIST_CLASS_ID:
+        copyDepth++;
+        copy = arrayListSerializer.copy((ArrayList) obj);
+        break;
+      case ClassResolver.HASHMAP_CLASS_ID:
+        copyDepth++;
+        copy = hashMapSerializer.copy((HashMap) obj);
+        break;
+        // todo: add fastpath for other types.
+      default:
+        copyDepth++;
+        copy = classInfo.getSerializer().copy(obj);
+    }
+    copyDepth--;
+    return (T) copy;
+  }
+
+  /**
+   * Track ref for copy.
+   *
+   * <p>Call this method immediately after composited object such as object
+   * array/map/collection/bean is created so that circular reference can be 
copy correctly.
+   *
+   * @param o1 object before copying
+   * @param o2 the copied object
+   */
+  public <T> void reference(T o1, T o2) {
+    if (o1 != null) {
+      originToCopyMap.put(o1, o2);
+    }
+  }
+
+  public Object getCopyObject(Object originObj) {
+    return originToCopyMap.get(originObj);
+  }
+
   private void serializeToStream(OutputStream outputStream, 
Consumer<MemoryBuffer> function) {
     MemoryBuffer buf = getBuffer();
     if (outputStream.getClass() == ByteArrayOutputStream.class) {
@@ -1257,6 +1391,7 @@ public final class Fury implements BaseFury {
     peerOutOfBandEnabled = false;
     bufferCallback = null;
     depth = 0;
+    resetCopy();
   }
 
   public void resetWrite() {
@@ -1279,6 +1414,11 @@ public final class Fury implements BaseFury {
     depth = 0;
   }
 
+  public void resetCopy() {
+    originToCopyMap.clear();
+    copyDepth = 0;
+  }
+
   private void throwDepthSerializationException() {
     String method = "Fury#" + (language != Language.JAVA ? "x" : "") + 
"writeXXX";
     throw new IllegalStateException(
@@ -1339,6 +1479,10 @@ public final class Fury implements BaseFury {
     this.depth += diff;
   }
 
+  public void incCopyDepth(int diff) {
+    this.copyDepth += diff;
+  }
+
   // Invoked by jit
   public StringSerializer getStringSerializer() {
     return stringSerializer;
@@ -1356,6 +1500,10 @@ public final class Fury implements BaseFury {
     return refTracking;
   }
 
+  public boolean copyTrackingRef() {
+    return copyRefTracking;
+  }
+
   public boolean isStringRefIgnored() {
     return config.isStringRefIgnored();
   }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
 b/java/fury-core/src/main/java/org/apache/fury/FuryCopyable.java
similarity index 59%
copy from 
java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
copy to java/fury-core/src/main/java/org/apache/fury/FuryCopyable.java
index b8159e81..eea28fc6 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
+++ b/java/fury-core/src/main/java/org/apache/fury/FuryCopyable.java
@@ -17,22 +17,13 @@
  * under the License.
  */
 
-package org.apache.fury.serializer;
-
-import org.apache.fury.Fury;
-import org.apache.fury.memory.MemoryBuffer;
+package org.apache.fury;
 
 /**
- * Base class for compatible serializer. Both JIT mode serializer and 
interpreter-mode serializer
- * will extend this class.
+ * Fury copy interface. Customize the copy method of the class
+ *
+ * @param <T> custom copy interface object
  */
-public abstract class CompatibleSerializerBase<T> extends Serializer<T> {
-  public CompatibleSerializerBase(Fury fury, Class<T> type) {
-    super(fury, type);
-  }
-
-  public T readAndSetFields(MemoryBuffer buffer, T obj) {
-    // java record object doesn't support update state.
-    throw new UnsupportedOperationException();
-  }
+public interface FuryCopyable<T> {
+  T copy(Fury fury);
 }
diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java 
b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
index dd980278..e4fae2ae 100644
--- a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
@@ -245,6 +245,11 @@ public class ThreadLocalFury extends 
AbstractThreadSafeFury {
     return 
bindingThreadLocal.get().get().deserializeJavaObjectAndClass(channel);
   }
 
+  @Override
+  public <T> T copy(T obj) {
+    return bindingThreadLocal.get().get().copy(obj);
+  }
+
   @Override
   public void setClassLoader(ClassLoader classLoader) {
     setClassLoader(classLoader, StagingType.STRONG_STAGING);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/builder/Generated.java 
b/java/fury-core/src/main/java/org/apache/fury/builder/Generated.java
index fde60cdb..1139b3c3 100644
--- a/java/fury-core/src/main/java/org/apache/fury/builder/Generated.java
+++ b/java/fury-core/src/main/java/org/apache/fury/builder/Generated.java
@@ -27,6 +27,7 @@ import org.apache.fury.Fury;
 import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.meta.ClassDef;
 import org.apache.fury.reflect.ReflectionUtils;
+import org.apache.fury.serializer.AbstractObjectSerializer;
 import org.apache.fury.serializer.CompatibleSerializerBase;
 import org.apache.fury.serializer.Serializer;
 import org.apache.fury.util.Preconditions;
@@ -39,7 +40,7 @@ import org.apache.fury.util.Preconditions;
 public interface Generated {
 
   /** Base class for all generated serializers. */
-  abstract class GeneratedSerializer extends Serializer implements Generated {
+  abstract class GeneratedSerializer extends AbstractObjectSerializer 
implements Generated {
     public GeneratedSerializer(Fury fury, Class<?> cls) {
       super(fury, cls);
     }
diff --git a/java/fury-core/src/main/java/org/apache/fury/config/Config.java 
b/java/fury-core/src/main/java/org/apache/fury/config/Config.java
index 4e64e31f..e783044f 100644
--- a/java/fury-core/src/main/java/org/apache/fury/config/Config.java
+++ b/java/fury-core/src/main/java/org/apache/fury/config/Config.java
@@ -38,6 +38,7 @@ public class Config implements Serializable {
   private final boolean basicTypesRefIgnored;
   private final boolean stringRefIgnored;
   private final boolean timeRefIgnored;
+  private final boolean copyTrackingRef;
   private final boolean codeGenEnabled;
   private final boolean checkClassVersion;
   private final CompatibleMode compatibleMode;
@@ -65,6 +66,7 @@ public class Config implements Serializable {
     basicTypesRefIgnored = !trackingRef || builder.basicTypesRefIgnored;
     stringRefIgnored = !trackingRef || builder.stringRefIgnored;
     timeRefIgnored = !trackingRef || builder.timeRefIgnored;
+    copyTrackingRef = builder.copyTrackingRef;
     compressString = builder.compressString;
     compressInt = builder.compressInt;
     longEncoding = builder.longEncoding;
@@ -99,6 +101,10 @@ public class Config implements Serializable {
     return trackingRef;
   }
 
+  public boolean copyTrackingRef() {
+    return copyTrackingRef;
+  }
+
   public boolean isBasicTypesRefIgnored() {
     return basicTypesRefIgnored;
   }
@@ -247,6 +253,7 @@ public class Config implements Serializable {
         && basicTypesRefIgnored == config.basicTypesRefIgnored
         && stringRefIgnored == config.stringRefIgnored
         && timeRefIgnored == config.timeRefIgnored
+        && copyTrackingRef == config.copyTrackingRef
         && codeGenEnabled == config.codeGenEnabled
         && checkClassVersion == config.checkClassVersion
         && checkJdkClassSerializable == config.checkJdkClassSerializable
@@ -276,6 +283,7 @@ public class Config implements Serializable {
         basicTypesRefIgnored,
         stringRefIgnored,
         timeRefIgnored,
+        copyTrackingRef,
         codeGenEnabled,
         checkClassVersion,
         compatibleMode,
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java 
b/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java
index d11b36ba..2080dca6 100644
--- a/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java
+++ b/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java
@@ -59,6 +59,7 @@ public final class FuryBuilder {
   boolean checkClassVersion = false;
   Language language = Language.JAVA;
   boolean trackingRef = false;
+  boolean copyTrackingRef = false;
   boolean basicTypesRefIgnored = true;
   boolean stringRefIgnored = true;
   boolean timeRefIgnored = true;
@@ -98,6 +99,12 @@ public final class FuryBuilder {
     return this;
   }
 
+  /** Whether track {@link Fury#copy(Object)} circular references. */
+  public FuryBuilder withCopyRefTracking(boolean copyTrackingRef) {
+    this.copyTrackingRef = copyTrackingRef;
+    return this;
+  }
+
   /** Whether ignore basic types shared reference. */
   public FuryBuilder ignoreBasicTypesRef(boolean ignoreBasicTypesRef) {
     this.basicTypesRefIgnored = ignoreBasicTypesRef;
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java 
b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
index da163ccf..f2ae405d 100644
--- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
@@ -266,6 +266,11 @@ public class ThreadPoolFury extends AbstractThreadSafeFury 
{
     return execute(fury -> fury.deserializeJavaObjectAndClass(channel));
   }
 
+  @Override
+  public <T> T copy(T obj) {
+    return execute(fury -> fury.copy(obj));
+  }
+
   @Override
   public void setClassLoader(ClassLoader classLoader) {
     setClassLoader(classLoader, LoaderBinding.StagingType.SOFT_STAGING);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java 
b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
index 1e31db07..93a3701e 100644
--- a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
+++ b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
@@ -76,6 +76,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.fury.Fury;
+import org.apache.fury.FuryCopyable;
 import org.apache.fury.annotation.CodegenInvoke;
 import org.apache.fury.annotation.Internal;
 import org.apache.fury.builder.CodecUtils;
@@ -110,6 +111,7 @@ import 
org.apache.fury.serializer.CodegenSerializer.LazyInitBeanSerializer;
 import org.apache.fury.serializer.CompatibleSerializer;
 import org.apache.fury.serializer.EnumSerializer;
 import org.apache.fury.serializer.ExternalizableSerializer;
+import org.apache.fury.serializer.FuryCopyableSerializer;
 import org.apache.fury.serializer.JavaSerializer;
 import org.apache.fury.serializer.JdkProxySerializer;
 import org.apache.fury.serializer.LambdaSerializer;
@@ -1198,7 +1200,11 @@ public class ClassResolver {
     }
 
     Class<? extends Serializer> serializerClass = getSerializerClass(cls);
-    return Serializers.newSerializer(fury, cls, serializerClass);
+    Serializer serializer = Serializers.newSerializer(fury, cls, 
serializerClass);
+    if (FuryCopyable.class.isAssignableFrom(cls)) {
+      serializer = new FuryCopyableSerializer<>(fury, cls, serializer);
+    }
+    return serializer;
   }
 
   private String generateSecurityMsg(Class<?> cls) {
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java
new file mode 100644
index 00000000..c609ce30
--- /dev/null
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.serializer;
+
+import java.lang.invoke.MethodHandle;
+import java.util.List;
+import org.apache.fury.Fury;
+import org.apache.fury.memory.Platform;
+import org.apache.fury.reflect.FieldAccessor;
+import org.apache.fury.reflect.ReflectionUtils;
+import org.apache.fury.resolver.ClassResolver;
+import org.apache.fury.resolver.FieldResolver.FieldInfo;
+import org.apache.fury.resolver.RefResolver;
+import org.apache.fury.util.record.RecordUtils;
+
+public abstract class AbstractObjectSerializer<T> extends Serializer<T> {
+
+  protected final RefResolver refResolver;
+  protected final ClassResolver classResolver;
+  protected final boolean isRecord;
+  protected final MethodHandle constructor;
+
+  public AbstractObjectSerializer(Fury fury, Class<T> type) {
+    super(fury, type);
+    this.refResolver = fury.getRefResolver();
+    this.classResolver = fury.getClassResolver();
+    this.isRecord = RecordUtils.isRecord(type);
+    if (isRecord) {
+      this.constructor = RecordUtils.getRecordConstructor(type).f1;
+    } else {
+      this.constructor = ReflectionUtils.getCtrHandle(type, false);
+    }
+  }
+
+  public AbstractObjectSerializer(Fury fury, Class<T> type, MethodHandle 
constructor) {
+    super(fury, type);
+    this.refResolver = fury.getRefResolver();
+    this.classResolver = fury.getClassResolver();
+    this.isRecord = RecordUtils.isRecord(type);
+    this.constructor = constructor;
+  }
+
+  @Override
+  public T copy(T originObj) {
+    if (immutable) {
+      return originObj;
+    }
+    if (isRecord) {
+      Object[] fieldValues = copyFields(originObj);
+      try {
+        return (T) constructor.invokeWithArguments(fieldValues);
+      } catch (Throwable e) {
+        Platform.throwException(e);
+      }
+      return originObj;
+    }
+    T newObj = newBean();
+    if (needToCopyRef) {
+      T copyObject = (T) fury.getCopyObject(originObj);
+      if (copyObject != null) {
+        return copyObject;
+      }
+      fury.reference(originObj, newObj);
+    }
+    copyFields(originObj, newObj);
+    return newObj;
+  }
+
+  private Object[] copyFields(T originObj) {
+    return classResolver.getFieldResolver(type).getAllFieldsList().stream()
+        .map(
+            fieldInfo -> {
+              FieldAccessor fieldAccessor = fieldInfo.getFieldAccessor();
+              if (classResolver.isPrimitive(fieldInfo.getEmbeddedClassId())) {
+                return fieldAccessor.get(originObj);
+              }
+              return fury.copyObject(fieldAccessor.get(originObj));
+            })
+        .toArray();
+  }
+
+  private void copyFields(T originObj, T newObj) {
+    List<FieldInfo> fieldsList = 
classResolver.getFieldResolver(type).getAllFieldsList();
+    for (FieldInfo info : fieldsList) {
+      FieldAccessor fieldAccessor = info.getFieldAccessor();
+      long offset = fieldAccessor.getFieldOffset();
+      switch (info.getEmbeddedClassId()) {
+        case ClassResolver.PRIMITIVE_BYTE_CLASS_ID:
+          Platform.putByte(newObj, offset, Platform.getByte(originObj, 
offset));
+          break;
+        case ClassResolver.PRIMITIVE_CHAR_CLASS_ID:
+          Platform.putChar(newObj, offset, Platform.getChar(originObj, 
offset));
+          break;
+        case ClassResolver.PRIMITIVE_SHORT_CLASS_ID:
+          Platform.putShort(newObj, offset, Platform.getShort(originObj, 
offset));
+          break;
+        case ClassResolver.PRIMITIVE_INT_CLASS_ID:
+          Platform.putInt(newObj, offset, Platform.getInt(originObj, offset));
+          break;
+        case ClassResolver.PRIMITIVE_LONG_CLASS_ID:
+          Platform.putLong(newObj, offset, Platform.getLong(originObj, 
offset));
+          break;
+        case ClassResolver.PRIMITIVE_FLOAT_CLASS_ID:
+          Platform.putFloat(newObj, offset, Platform.getFloat(originObj, 
offset));
+          break;
+        case ClassResolver.PRIMITIVE_DOUBLE_CLASS_ID:
+          Platform.putDouble(newObj, offset, Platform.getDouble(originObj, 
offset));
+          break;
+        case ClassResolver.PRIMITIVE_BOOLEAN_CLASS_ID:
+          Platform.putBoolean(newObj, offset, Platform.getBoolean(originObj, 
offset));
+          break;
+        default:
+          Platform.putObject(
+              newObj, offset, fury.copyObject(Platform.getObject(originObj, 
offset)));
+      }
+    }
+  }
+
+  protected T newBean() {
+    if (constructor != null) {
+      try {
+        return (T) constructor.invoke();
+      } catch (Throwable e) {
+        Platform.throwException(e);
+      }
+    }
+    return Platform.newInstance(type);
+  }
+}
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java
index 06f3f88b..edae8c16 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java
@@ -20,6 +20,7 @@
 package org.apache.fury.serializer;
 
 import java.lang.reflect.Array;
+import java.util.Arrays;
 import java.util.IdentityHashMap;
 import org.apache.fury.Fury;
 import org.apache.fury.config.CompatibleMode;
@@ -108,6 +109,16 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public T[] copy(T[] originArray) {
+      int length = originArray.length;
+      Object[] newArray = newArray(length);
+      for (int i = 0; i < length; i++) {
+        newArray[i] = fury.copyObject(originArray[i]);
+      }
+      return (T[]) newArray;
+    }
+
     @Override
     public void xwrite(MemoryBuffer buffer, T[] arr) {
       int len = arr.length;
@@ -262,6 +273,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public boolean[] copy(boolean[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public boolean[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -298,6 +314,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public byte[] copy(byte[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public byte[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -332,6 +353,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public char[] copy(char[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public char[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -383,6 +409,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public short[] copy(short[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public short[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -419,6 +450,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public int[] copy(int[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public int[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -455,6 +491,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public long[] copy(long[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public long[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -491,6 +532,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public float[] copy(float[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public float[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -527,6 +573,11 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public double[] copy(double[] originArray) {
+      return Arrays.copyOf(originArray, originArray.length);
+    }
+
     @Override
     public double[] read(MemoryBuffer buffer) {
       if (fury.isPeerOutOfBandEnabled()) {
@@ -593,6 +644,13 @@ public class ArraySerializers {
       }
     }
 
+    @Override
+    public String[] copy(String[] originArray) {
+      String[] newArray = new String[originArray.length];
+      System.arraycopy(originArray, 0, newArray, 0, originArray.length);
+      return newArray;
+    }
+
     @Override
     public String[] read(MemoryBuffer buffer) {
       int numElements = buffer.readVarUint32Small7();
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CodegenSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/CodegenSerializer.java
index c5711a85..4017171e 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CodegenSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/CodegenSerializer.java
@@ -70,7 +70,7 @@ public final class CodegenSerializer {
    * potential recursive bean serializer creation when there is a circular 
reference in class
    * children fields.
    */
-  public static final class LazyInitBeanSerializer<T> extends Serializer<T> {
+  public static final class LazyInitBeanSerializer<T> extends 
AbstractObjectSerializer<T> {
     private Serializer<T> serializer;
     private Serializer<T> interpreterSerializer;
 
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializer.java
index f9cf8a8a..d018e5dd 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializer.java
@@ -19,7 +19,6 @@
 
 package org.apache.fury.serializer;
 
-import java.lang.invoke.MethodHandle;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -29,11 +28,9 @@ import org.apache.fury.Fury;
 import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.memory.Platform;
 import org.apache.fury.reflect.FieldAccessor;
-import org.apache.fury.reflect.ReflectionUtils;
 import org.apache.fury.resolver.ClassInfo;
 import org.apache.fury.resolver.ClassResolver;
 import org.apache.fury.resolver.FieldResolver;
-import org.apache.fury.resolver.RefResolver;
 import org.apache.fury.serializer.collection.AbstractCollectionSerializer;
 import org.apache.fury.serializer.collection.AbstractMapSerializer;
 import org.apache.fury.util.Preconditions;
@@ -51,43 +48,30 @@ import org.apache.fury.util.record.RecordUtils;
 @SuppressWarnings({"unchecked", "rawtypes"})
 public final class CompatibleSerializer<T> extends CompatibleSerializerBase<T> 
{
   private static final int INDEX_FOR_SKIP_FILL_VALUES = -1;
-  private final RefResolver refResolver;
-  private final ClassResolver classResolver;
   private final FieldResolver fieldResolver;
-  private final boolean isRecord;
-  private final MethodHandle constructor;
   private final RecordInfo recordInfo;
 
   public CompatibleSerializer(Fury fury, Class<T> cls) {
     super(fury, cls);
-    this.refResolver = fury.getRefResolver();
-    this.classResolver = fury.getClassResolver();
     // Use `setSerializerIfAbsent` to avoid overwriting existing serializer 
for class when used
     // as data serializer.
     classResolver.setSerializerIfAbsent(cls, this);
     fieldResolver = classResolver.getFieldResolver(cls);
-    isRecord = RecordUtils.isRecord(type);
     if (isRecord) {
-      constructor = RecordUtils.getRecordConstructor(type).f1;
       List<String> fieldNames =
           fieldResolver.getAllFieldsList().stream()
               .map(FieldResolver.FieldInfo::getName)
               .collect(Collectors.toList());
       recordInfo = new RecordInfo(cls, fieldNames);
     } else {
-      this.constructor = ReflectionUtils.getCtrHandle(type, false);
       recordInfo = null;
     }
   }
 
   public CompatibleSerializer(Fury fury, Class<T> cls, FieldResolver 
fieldResolver) {
-    super(fury, cls);
-    this.refResolver = fury.getRefResolver();
-    this.classResolver = fury.getClassResolver();
-    isRecord = RecordUtils.isRecord(type);
+    super(fury, cls, null);
     Preconditions.checkArgument(!isRecord, cls);
     recordInfo = null;
-    this.constructor = null;
     this.fieldResolver = fieldResolver;
   }
 
@@ -626,15 +610,4 @@ public final class CompatibleSerializer<T> extends 
CompatibleSerializerBase<T> {
         }
     }
   }
-
-  private Object newBean() {
-    if (constructor != null) {
-      try {
-        return constructor.invoke();
-      } catch (Throwable e) {
-        Platform.throwException(e);
-      }
-    }
-    return Platform.newInstance(type);
-  }
 }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
index b8159e81..452fb520 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.fury.serializer;
 
+import java.lang.invoke.MethodHandle;
 import org.apache.fury.Fury;
 import org.apache.fury.memory.MemoryBuffer;
 
@@ -26,11 +27,15 @@ import org.apache.fury.memory.MemoryBuffer;
  * Base class for compatible serializer. Both JIT mode serializer and 
interpreter-mode serializer
  * will extend this class.
  */
-public abstract class CompatibleSerializerBase<T> extends Serializer<T> {
+public abstract class CompatibleSerializerBase<T> extends 
AbstractObjectSerializer<T> {
   public CompatibleSerializerBase(Fury fury, Class<T> type) {
     super(fury, type);
   }
 
+  public CompatibleSerializerBase(Fury fury, Class<T> type, MethodHandle 
constructor) {
+    super(fury, type, constructor);
+  }
+
   public T readAndSetFields(MemoryBuffer buffer, T obj) {
     // java record object doesn't support update state.
     throw new UnsupportedOperationException();
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java
index f03db52f..1b23f8da 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java
@@ -25,7 +25,7 @@ import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.util.Preconditions;
 
 @SuppressWarnings("rawtypes")
-public final class EnumSerializer extends Serializer<Enum> {
+public final class EnumSerializer extends ImmutableSerializer<Enum> {
   private final Enum[] enumConstants;
 
   public EnumSerializer(Fury fury, Class<Enum> cls) {
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/FuryCopyableSerializer.java
similarity index 53%
copy from 
java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java
copy to 
java/fury-core/src/main/java/org/apache/fury/serializer/FuryCopyableSerializer.java
index c8afaaef..a17c35be 100644
--- a/java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/FuryCopyableSerializer.java
@@ -19,32 +19,33 @@
 
 package org.apache.fury.serializer;
 
-import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.fury.Fury;
-import org.apache.fury.config.Language;
+import org.apache.fury.FuryCopyable;
 import org.apache.fury.memory.MemoryBuffer;
-import org.apache.fury.type.TypeUtils;
 
-/**
- * Serialize/deserializer objects into binary. Note that this class is 
designed as an abstract class
- * instead of interface to reduce virtual method call cost of {@link 
#needToWriteRef}/{@link
- * #getXtypeId}.
- *
- * @param <T> type of objects being serializing/deserializing
- */
-@NotThreadSafe
-public abstract class Serializer<T> {
-  protected final Fury fury;
-  protected final Class<T> type;
-  protected final boolean isJava;
-  protected final boolean needToWriteRef;
+/** Fury custom copy serializer. see {@link FuryCopyable} */
+public class FuryCopyableSerializer<T> extends Serializer<T> {
+
+  private final Serializer<T> serializer;
+
+  public FuryCopyableSerializer(Fury fury, Class<T> type, Serializer<T> 
serializer) {
+    super(fury, type);
+    this.serializer = serializer;
+  }
 
+  @Override
   public void write(MemoryBuffer buffer, T value) {
-    throw new UnsupportedOperationException();
+    serializer.write(buffer, value);
+  }
+
+  @Override
+  public T copy(T obj) {
+    return ((FuryCopyable<T>) obj).copy(fury);
   }
 
+  @Override
   public T read(MemoryBuffer buffer) {
-    throw new UnsupportedOperationException();
+    return serializer.read(buffer);
   }
 
   /**
@@ -55,46 +56,24 @@ public abstract class Serializer<T> {
    * cross-language serialization and native serialization data is not the 
same with cross-language
    * serialization.
    */
+  @Override
   public short getXtypeId() {
-    return Fury.NOT_SUPPORT_CROSS_LANGUAGE;
+    return serializer.getXtypeId();
   }
 
   /** Returns a type tag used for setup type mapping between languages. */
+  @Override
   public String getCrossLanguageTypeTag() {
-    throw new UnsupportedOperationException();
+    return serializer.getCrossLanguageTypeTag();
   }
 
+  @Override
   public void xwrite(MemoryBuffer buffer, T value) {
-    throw new UnsupportedOperationException();
+    serializer.xwrite(buffer, value);
   }
 
+  @Override
   public T xread(MemoryBuffer buffer) {
-    throw new UnsupportedOperationException();
-  }
-
-  public Serializer(Fury fury, Class<T> type) {
-    this.fury = fury;
-    this.type = type;
-    this.isJava = fury.getLanguage() == Language.JAVA;
-    if (fury.trackingRef()) {
-      needToWriteRef = !TypeUtils.isBoxed(TypeUtils.wrap(type)) || 
!fury.isBasicTypesRefIgnored();
-    } else {
-      needToWriteRef = false;
-    }
-  }
-
-  public Serializer(Fury fury, Class<T> type, boolean needToWriteRef) {
-    this.fury = fury;
-    this.type = type;
-    this.isJava = fury.getLanguage() == Language.JAVA;
-    this.needToWriteRef = needToWriteRef;
-  }
-
-  public final boolean needToWriteRef() {
-    return needToWriteRef;
-  }
-
-  public Class<T> getType() {
-    return type;
+    return serializer.xread(buffer);
   }
 }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/ImmutableSerializer.java
similarity index 62%
copy from 
java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
copy to 
java/fury-core/src/main/java/org/apache/fury/serializer/ImmutableSerializer.java
index b8159e81..c8cec37c 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/CompatibleSerializerBase.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/ImmutableSerializer.java
@@ -20,19 +20,23 @@
 package org.apache.fury.serializer;
 
 import org.apache.fury.Fury;
-import org.apache.fury.memory.MemoryBuffer;
 
 /**
- * Base class for compatible serializer. Both JIT mode serializer and 
interpreter-mode serializer
- * will extend this class.
+ * Serializer for immutable objects.
+ *
+ * @param <T> type of objects being serializing/deserializing
  */
-public abstract class CompatibleSerializerBase<T> extends Serializer<T> {
-  public CompatibleSerializerBase(Fury fury, Class<T> type) {
-    super(fury, type);
+public abstract class ImmutableSerializer<T> extends Serializer<T> {
+
+  public ImmutableSerializer(Fury fury, Class<T> type) {
+    super(fury, type, true);
+  }
+
+  public ImmutableSerializer(Fury fury, Class<T> type, boolean needToWriteRef) 
{
+    super(fury, type, needToWriteRef, true);
   }
 
-  public T readAndSetFields(MemoryBuffer buffer, T obj) {
-    // java record object doesn't support update state.
-    throw new UnsupportedOperationException();
+  public ImmutableSerializer(Fury fury, Class<T> type, boolean needToWriteRef, 
boolean immutable) {
+    super(fury, type, needToWriteRef, true);
   }
 }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/LocaleSerializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/LocaleSerializer.java
index de7c84fe..fdd87f70 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/LocaleSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/LocaleSerializer.java
@@ -27,7 +27,7 @@ import org.apache.fury.collection.Tuple3;
 import org.apache.fury.memory.MemoryBuffer;
 
 /** Local serializer for {@link Locale}. */
-public final class LocaleSerializer extends Serializer<Locale> {
+public final class LocaleSerializer extends ImmutableSerializer<Locale> {
   // Using `new HashMap<>` to ensure thread safety by java constructor 
semantics.
   private static final Map<Tuple3<String, String, String>, Locale> 
LOCALE_CACHE =
       new HashMap<>(createCacheMap());
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/MetaSharedSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/MetaSharedSerializer.java
index 3fbd3312..77647d8e 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/MetaSharedSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/MetaSharedSerializer.java
@@ -19,7 +19,6 @@
 
 package org.apache.fury.serializer;
 
-import java.lang.invoke.MethodHandle;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -34,7 +33,6 @@ import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.memory.Platform;
 import org.apache.fury.meta.ClassDef;
 import org.apache.fury.reflect.FieldAccessor;
-import org.apache.fury.reflect.ReflectionUtils;
 import org.apache.fury.resolver.ClassInfoHolder;
 import org.apache.fury.resolver.ClassResolver;
 import org.apache.fury.resolver.RefResolver;
@@ -62,7 +60,7 @@ import org.apache.fury.util.record.RecordUtils;
  * @see ObjectSerializer
  */
 @SuppressWarnings({"unchecked"})
-public class MetaSharedSerializer<T> extends Serializer<T> {
+public class MetaSharedSerializer<T> extends AbstractObjectSerializer<T> {
   private final ObjectSerializer.FinalTypeField[] finalFields;
 
   /**
@@ -74,8 +72,6 @@ public class MetaSharedSerializer<T> extends Serializer<T> {
 
   private final ObjectSerializer.GenericTypeField[] otherFields;
   private final ObjectSerializer.GenericTypeField[] containerFields;
-  private final boolean isRecord;
-  private final MethodHandle constructor;
   private final RecordInfo recordInfo;
   private Serializer<T> serializer;
   private final ClassInfoHolder classInfoHolder;
@@ -87,21 +83,15 @@ public class MetaSharedSerializer<T> extends Serializer<T> {
         "Class version check should be disabled when compatible mode is 
enabled.");
     Preconditions.checkArgument(
         fury.getConfig().isMetaShareEnabled(), "Meta share must be enabled.");
-    Collection<Descriptor> descriptors = 
consolidateFields(fury.getClassResolver(), type, classDef);
+    Collection<Descriptor> descriptors = consolidateFields(this.classResolver, 
type, classDef);
     DescriptorGrouper descriptorGrouper =
         DescriptorGrouper.createDescriptorGrouper(
-            fury.getClassResolver()::isMonomorphic,
+            this.classResolver::isMonomorphic,
             descriptors,
             false,
             fury.compressInt(),
             fury.getConfig().compressLong());
     // d.getField() may be null if not exists in this class when meta share 
enabled.
-    isRecord = RecordUtils.isRecord(type);
-    if (isRecord) {
-      constructor = RecordUtils.getRecordConstructor(type).f1;
-    } else {
-      this.constructor = ReflectionUtils.getCtrHandle(type, false);
-    }
     Tuple3<
             Tuple2<ObjectSerializer.FinalTypeField[], boolean[]>,
             ObjectSerializer.GenericTypeField[],
@@ -111,7 +101,7 @@ public class MetaSharedSerializer<T> extends Serializer<T> {
     isFinal = infos.f0.f1;
     otherFields = infos.f1;
     containerFields = infos.f2;
-    classInfoHolder = fury.getClassResolver().nilClassInfoHolder();
+    classInfoHolder = this.classResolver.nilClassInfoHolder();
     if (isRecord) {
       List<String> fieldNames =
           descriptorGrouper.getSortedDescriptors().stream()
@@ -127,8 +117,7 @@ public class MetaSharedSerializer<T> extends Serializer<T> {
   public void write(MemoryBuffer buffer, T value) {
     if (serializer == null) {
       serializer =
-          fury.getClassResolver()
-              .createSerializerSafe(type, () -> new ObjectSerializer<>(fury, 
type));
+          this.classResolver.createSerializerSafe(type, () -> new 
ObjectSerializer<>(fury, type));
     }
     serializer.write(buffer, value);
   }
@@ -148,10 +137,10 @@ public class MetaSharedSerializer<T> extends 
Serializer<T> {
         Platform.throwException(e);
       }
     }
-    T obj = ObjectSerializer.newBean(constructor, type);
+    T obj = newBean();
     Fury fury = this.fury;
-    RefResolver refResolver = fury.getRefResolver();
-    ClassResolver classResolver = fury.getClassResolver();
+    RefResolver refResolver = this.refResolver;
+    ClassResolver classResolver = this.classResolver;
     refResolver.reference(obj);
     // read order: primitive,boxed,final,other,collection,map
     ObjectSerializer.FinalTypeField[] finalFields = this.finalFields;
@@ -205,8 +194,8 @@ public class MetaSharedSerializer<T> extends Serializer<T> {
   private void readFields(MemoryBuffer buffer, Object[] fields) {
     int counter = 0;
     Fury fury = this.fury;
-    RefResolver refResolver = fury.getRefResolver();
-    ClassResolver classResolver = fury.getClassResolver();
+    RefResolver refResolver = this.refResolver;
+    ClassResolver classResolver = this.classResolver;
     // read order: primitive,boxed,final,other,collection,map
     ObjectSerializer.FinalTypeField[] finalFields = this.finalFields;
     for (int i = 0; i < finalFields.length; i++) {
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/ObjectSerializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/ObjectSerializer.java
index 64618570..9edd5fb8 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/ObjectSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/ObjectSerializer.java
@@ -22,7 +22,6 @@ package org.apache.fury.serializer;
 import static org.apache.fury.type.DescriptorGrouper.createDescriptorGrouper;
 import static org.apache.fury.type.TypeUtils.getRawType;
 
-import java.lang.invoke.MethodHandle;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -37,7 +36,6 @@ import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.memory.Platform;
 import org.apache.fury.meta.ClassDef;
 import org.apache.fury.reflect.FieldAccessor;
-import org.apache.fury.reflect.ReflectionUtils;
 import org.apache.fury.reflect.TypeRef;
 import org.apache.fury.resolver.ClassInfo;
 import org.apache.fury.resolver.ClassInfoHolder;
@@ -66,10 +64,7 @@ import org.apache.fury.util.record.RecordUtils;
  */
 // TODO(chaokunyang) support generics optimization for {@code SomeClass<T>}
 @SuppressWarnings({"unchecked"})
-public final class ObjectSerializer<T> extends Serializer<T> {
-  private final RefResolver refResolver;
-  private final ClassResolver classResolver;
-  private final boolean isRecord;
+public final class ObjectSerializer<T> extends AbstractObjectSerializer<T> {
   private final RecordInfo recordInfo;
   private final FinalTypeField[] finalFields;
 
@@ -82,7 +77,6 @@ public final class ObjectSerializer<T> extends Serializer<T> {
 
   private final GenericTypeField[] otherFields;
   private final GenericTypeField[] containerFields;
-  private final MethodHandle constructor;
   private final int classVersionHash;
 
   public ObjectSerializer(Fury fury, Class<T> cls) {
@@ -91,8 +85,6 @@ public final class ObjectSerializer<T> extends Serializer<T> {
 
   public ObjectSerializer(Fury fury, Class<T> cls, boolean resolveParent) {
     super(fury, cls);
-    this.refResolver = fury.getRefResolver();
-    this.classResolver = fury.getClassResolver();
     // avoid recursive building serializers.
     // Use `setSerializerIfAbsent` to avoid overwriting existing serializer 
for class when used
     // as data serializer.
@@ -112,16 +104,14 @@ public final class ObjectSerializer<T> extends 
Serializer<T> {
             false,
             fury.compressInt(),
             fury.compressLong());
-    isRecord = RecordUtils.isRecord(cls);
+
     if (isRecord) {
-      constructor = RecordUtils.getRecordConstructor(cls).f1;
       List<String> fieldNames =
           descriptorGrouper.getSortedDescriptors().stream()
               .map(Descriptor::getName)
               .collect(Collectors.toList());
       recordInfo = new RecordInfo(cls, fieldNames);
     } else {
-      this.constructor = ReflectionUtils.getCtrHandle(cls, false);
       recordInfo = null;
     }
     if (fury.checkClassVersion()) {
@@ -323,7 +313,7 @@ public final class ObjectSerializer<T> extends 
Serializer<T> {
         Platform.throwException(e);
       }
     }
-    T obj = newBean(constructor, type);
+    T obj = newBean();
     refResolver.reference(obj);
     return readAndSetFields(buffer, obj);
   }
@@ -866,17 +856,6 @@ public final class ObjectSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  static <T> T newBean(MethodHandle constructor, Class<T> type) {
-    if (constructor != null) {
-      try {
-        return (T) constructor.invoke();
-      } catch (Throwable e) {
-        Platform.throwException(e);
-      }
-    }
-    return Platform.newInstance(type);
-  }
-
   static class InternalFieldInfo {
     protected final short classId;
     protected final String qualifiedFieldName;
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/OptionalSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/OptionalSerializers.java
index 640d1857..1fa48db8 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/OptionalSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/OptionalSerializers.java
@@ -43,13 +43,21 @@ public final class OptionalSerializers {
       fury.writeRef(buffer, nullable);
     }
 
+    @Override
+    public Optional copy(Optional originOptional) {
+      if (originOptional.isPresent()) {
+        return Optional.ofNullable(fury.copyObject(originOptional.get()));
+      }
+      return originOptional;
+    }
+
     @Override
     public Optional read(MemoryBuffer buffer) {
       return Optional.ofNullable(fury.readRef(buffer));
     }
   }
 
-  public static final class OptionalIntSerializer extends 
Serializer<OptionalInt> {
+  public static final class OptionalIntSerializer extends 
ImmutableSerializer<OptionalInt> {
     public OptionalIntSerializer(Fury fury) {
       super(fury, OptionalInt.class);
     }
@@ -73,7 +81,7 @@ public final class OptionalSerializers {
     }
   }
 
-  public static final class OptionalLongSerializer extends 
Serializer<OptionalLong> {
+  public static final class OptionalLongSerializer extends 
ImmutableSerializer<OptionalLong> {
     public OptionalLongSerializer(Fury fury) {
       super(fury, OptionalLong.class);
     }
@@ -97,7 +105,7 @@ public final class OptionalSerializers {
     }
   }
 
-  public static final class OptionalDoubleSerializer extends 
Serializer<OptionalDouble> {
+  public static final class OptionalDoubleSerializer extends 
ImmutableSerializer<OptionalDouble> {
     public OptionalDoubleSerializer(Fury fury) {
       super(fury, OptionalDouble.class);
     }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/PrimitiveSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/PrimitiveSerializers.java
index e27cda8d..1b386b87 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/PrimitiveSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/PrimitiveSerializers.java
@@ -40,7 +40,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.BOOL.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
     }
 
     @Override
@@ -61,7 +62,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.INT8.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
     }
 
     @Override
@@ -121,7 +123,7 @@ public class PrimitiveSerializers {
     }
   }
 
-  public static final class CharSerializer extends Serializer<Character> {
+  public static final class CharSerializer extends 
ImmutableSerializer<Character> {
     public CharSerializer(Fury fury, Class<?> cls) {
       super(fury, (Class) cls, !(cls.isPrimitive() || 
fury.isBasicTypesRefIgnored()));
     }
@@ -144,7 +146,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.INT16.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
     }
 
     @Override
@@ -167,7 +170,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.INT32.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
       compressNumber = fury.compressInt();
     }
 
@@ -210,7 +214,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.INT64.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
       longEncoding = fury.longEncoding();
     }
 
@@ -294,7 +299,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.FLOAT.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
     }
 
     @Override
@@ -315,7 +321,8 @@ public class PrimitiveSerializers {
           fury,
           (Class) cls,
           Type.DOUBLE.getId(),
-          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()));
+          !(cls.isPrimitive() || fury.isBasicTypesRefIgnored()),
+          true);
     }
 
     @Override
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java
index c8afaaef..81990892 100644
--- a/java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java
+++ b/java/fury-core/src/main/java/org/apache/fury/serializer/Serializer.java
@@ -39,10 +39,26 @@ public abstract class Serializer<T> {
   protected final boolean isJava;
   protected final boolean needToWriteRef;
 
+  /**
+   * Whether to enable circular reference of copy. Only for mutable objects, 
immutable objects just
+   * return itself.
+   */
+  protected final boolean needToCopyRef;
+
+  protected final boolean immutable;
+
   public void write(MemoryBuffer buffer, T value) {
     throw new UnsupportedOperationException();
   }
 
+  public T copy(T value) {
+    if (isImmutable()) {
+      return value;
+    }
+    throw new UnsupportedOperationException(
+        String.format("Copy for %s is not supported", value.getClass()));
+  }
+
   public T read(MemoryBuffer buffer) {
     throw new UnsupportedOperationException();
   }
@@ -81,20 +97,45 @@ public abstract class Serializer<T> {
     } else {
       needToWriteRef = false;
     }
+    this.needToCopyRef = fury.copyTrackingRef();
+    this.immutable = false;
   }
 
-  public Serializer(Fury fury, Class<T> type, boolean needToWriteRef) {
+  public Serializer(Fury fury, Class<T> type, boolean immutable) {
+    this.fury = fury;
+    this.type = type;
+    this.isJava = fury.getLanguage() == Language.JAVA;
+    if (fury.trackingRef()) {
+      needToWriteRef = !TypeUtils.isBoxed(TypeUtils.wrap(type)) || 
!fury.isBasicTypesRefIgnored();
+    } else {
+      needToWriteRef = false;
+    }
+    this.needToCopyRef = fury.copyTrackingRef() && !immutable;
+    this.immutable = immutable;
+  }
+
+  public Serializer(Fury fury, Class<T> type, boolean needToWriteRef, boolean 
immutable) {
     this.fury = fury;
     this.type = type;
     this.isJava = fury.getLanguage() == Language.JAVA;
     this.needToWriteRef = needToWriteRef;
+    this.needToCopyRef = fury.copyTrackingRef() && !immutable;
+    this.immutable = immutable;
   }
 
   public final boolean needToWriteRef() {
     return needToWriteRef;
   }
 
+  public final boolean needToCopyRef() {
+    return needToCopyRef;
+  }
+
   public Class<T> getType() {
     return type;
   }
+
+  public boolean isImmutable() {
+    return immutable;
+  }
 }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/Serializers.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/Serializers.java
index 63d0099c..36870aa6 100644
--- a/java/fury-core/src/main/java/org/apache/fury/serializer/Serializers.java
+++ b/java/fury-core/src/main/java/org/apache/fury/serializer/Serializers.java
@@ -191,6 +191,12 @@ public class Serializers {
       this.typeId = typeId;
     }
 
+    public CrossLanguageCompatibleSerializer(
+        Fury fury, Class<T> cls, short typeId, boolean needToWriteRef, boolean 
immutable) {
+      super(fury, cls, needToWriteRef, immutable);
+      this.typeId = typeId;
+    }
+
     @Override
     public short getXtypeId() {
       return typeId;
@@ -274,6 +280,11 @@ public class Serializers {
       super(fury, StringBuilder.class);
     }
 
+    @Override
+    public StringBuilder copy(StringBuilder origin) {
+      return new StringBuilder(origin);
+    }
+
     @Override
     public StringBuilder read(MemoryBuffer buffer) {
       return new StringBuilder(stringSerializer.readJavaString(buffer));
@@ -292,6 +303,11 @@ public class Serializers {
       super(fury, StringBuffer.class);
     }
 
+    @Override
+    public StringBuffer copy(StringBuffer origin) {
+      return new StringBuffer(origin);
+    }
+
     @Override
     public StringBuffer read(MemoryBuffer buffer) {
       return new StringBuffer(stringSerializer.readJavaString(buffer));
@@ -303,7 +319,7 @@ public class Serializers {
     }
   }
 
-  public static final class BigDecimalSerializer extends 
Serializer<BigDecimal> {
+  public static final class BigDecimalSerializer extends 
ImmutableSerializer<BigDecimal> {
     public BigDecimalSerializer(Fury fury) {
       super(fury, BigDecimal.class);
     }
@@ -328,7 +344,7 @@ public class Serializers {
     }
   }
 
-  public static final class BigIntegerSerializer extends 
Serializer<BigInteger> {
+  public static final class BigIntegerSerializer extends 
ImmutableSerializer<BigInteger> {
     public BigIntegerSerializer(Fury fury) {
       super(fury, BigInteger.class);
     }
@@ -359,6 +375,11 @@ public class Serializers {
       buffer.writeBoolean(value.get());
     }
 
+    @Override
+    public AtomicBoolean copy(AtomicBoolean origin) {
+      return new AtomicBoolean(origin.get());
+    }
+
     @Override
     public AtomicBoolean read(MemoryBuffer buffer) {
       return new AtomicBoolean(buffer.readBoolean());
@@ -376,6 +397,11 @@ public class Serializers {
       buffer.writeInt32(value.get());
     }
 
+    @Override
+    public AtomicInteger copy(AtomicInteger origin) {
+      return new AtomicInteger(origin.get());
+    }
+
     @Override
     public AtomicInteger read(MemoryBuffer buffer) {
       return new AtomicInteger(buffer.readInt32());
@@ -393,6 +419,11 @@ public class Serializers {
       buffer.writeInt64(value.get());
     }
 
+    @Override
+    public AtomicLong copy(AtomicLong origin) {
+      return new AtomicLong(origin.get());
+    }
+
     @Override
     public AtomicLong read(MemoryBuffer buffer) {
       return new AtomicLong(buffer.readInt64());
@@ -410,13 +441,18 @@ public class Serializers {
       fury.writeRef(buffer, value.get());
     }
 
+    @Override
+    public AtomicReference copy(AtomicReference origin) {
+      return new AtomicReference(fury.copyObject(origin.get()));
+    }
+
     @Override
     public AtomicReference read(MemoryBuffer buffer) {
       return new AtomicReference(fury.readRef(buffer));
     }
   }
 
-  public static final class CurrencySerializer extends Serializer<Currency> {
+  public static final class CurrencySerializer extends 
ImmutableSerializer<Currency> {
     public CurrencySerializer(Fury fury) {
       super(fury, Currency.class);
     }
@@ -434,7 +470,7 @@ public class Serializers {
   }
 
   /** Serializer for {@link Charset}. */
-  public static final class CharsetSerializer<T extends Charset> extends 
Serializer<T> {
+  public static final class CharsetSerializer<T extends Charset> extends 
ImmutableSerializer<T> {
     public CharsetSerializer(Fury fury, Class<T> type) {
       super(fury, type);
     }
@@ -448,7 +484,7 @@ public class Serializers {
     }
   }
 
-  public static final class URISerializer extends Serializer<java.net.URI> {
+  public static final class URISerializer extends 
ImmutableSerializer<java.net.URI> {
 
     public URISerializer(Fury fury) {
       super(fury, URI.class);
@@ -465,7 +501,7 @@ public class Serializers {
     }
   }
 
-  public static final class RegexSerializer extends Serializer<Pattern> {
+  public static final class RegexSerializer extends 
ImmutableSerializer<Pattern> {
     public RegexSerializer(Fury fury) {
       super(fury, Pattern.class);
     }
@@ -484,7 +520,7 @@ public class Serializers {
     }
   }
 
-  public static final class UUIDSerializer extends Serializer<UUID> {
+  public static final class UUIDSerializer extends ImmutableSerializer<UUID> {
 
     public UUIDSerializer(Fury fury) {
       super(fury, UUID.class);
@@ -502,7 +538,7 @@ public class Serializers {
     }
   }
 
-  public static final class ClassSerializer extends Serializer<Class> {
+  public static final class ClassSerializer extends ImmutableSerializer<Class> 
{
     public ClassSerializer(Fury fury) {
       super(fury, Class.class);
     }
@@ -525,7 +561,7 @@ public class Serializers {
    * serializable or class registration checks.
    */
   // Use a separate serializer to avoid codegen for emtpy object.
-  public static final class EmptyObjectSerializer extends Serializer<Object> {
+  public static final class EmptyObjectSerializer extends 
ImmutableSerializer<Object> {
 
     public EmptyObjectSerializer(Fury fury) {
       super(fury, Object.class);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/StringSerializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/StringSerializer.java
index 0c77bca4..b0b67abc 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/StringSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/StringSerializer.java
@@ -54,7 +54,7 @@ import org.apache.fury.util.unsafe._JDKAccess;
  * manually.
  */
 @SuppressWarnings("unchecked")
-public final class StringSerializer extends Serializer<String> {
+public final class StringSerializer extends ImmutableSerializer<String> {
   private static final boolean STRING_VALUE_FIELD_IS_CHARS;
   private static final boolean STRING_VALUE_FIELD_IS_BYTES;
 
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/TimeSerializers.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/TimeSerializers.java
index e4565296..cdd4129e 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/TimeSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/TimeSerializers.java
@@ -46,7 +46,7 @@ import org.apache.fury.util.DateTimeUtils;
 
 /** Serializers for all time related types. */
 public class TimeSerializers {
-  public abstract static class TimeSerializer<T> extends Serializer<T> {
+  public abstract static class TimeSerializer<T> extends 
ImmutableSerializer<T> {
 
     public TimeSerializer(Fury fury, Class<T> type) {
       super(fury, type, !fury.getConfig().isTimeRefIgnored());
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/URLSerializer.java 
b/java/fury-core/src/main/java/org/apache/fury/serializer/URLSerializer.java
index 051f312c..df269558 100644
--- a/java/fury-core/src/main/java/org/apache/fury/serializer/URLSerializer.java
+++ b/java/fury-core/src/main/java/org/apache/fury/serializer/URLSerializer.java
@@ -27,7 +27,7 @@ import org.apache.fury.memory.Platform;
 
 /** Serializer for {@link URL}. */
 // TODO(chaokunyang) ensure security to avoid dnslog detection.
-public final class URLSerializer extends Serializer<URL> {
+public final class URLSerializer extends ImmutableSerializer<URL> {
 
   public URLSerializer(Fury fury, Class<URL> type) {
     super(fury, type);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractCollectionSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractCollectionSerializer.java
index 15669489..8ade17b4 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractCollectionSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractCollectionSerializer.java
@@ -65,6 +65,13 @@ public abstract class AbstractCollectionSerializer<T> 
extends Serializer<T> {
     elementClassInfoHolder = fury.getClassResolver().nilClassInfoHolder();
   }
 
+  public AbstractCollectionSerializer(
+      Fury fury, Class<T> cls, boolean supportCodegenHook, boolean immutable) {
+    super(fury, cls, immutable);
+    this.supportCodegenHook = supportCodegenHook;
+    elementClassInfoHolder = fury.getClassResolver().nilClassInfoHolder();
+  }
+
   private GenericType getElementGenericType(Fury fury) {
     GenericType genericType = fury.getGenerics().nextGenericType();
     GenericType elemGenericType = null;
@@ -507,6 +514,34 @@ public abstract class AbstractCollectionSerializer<T> 
extends Serializer<T> {
     }
   }
 
+  public Collection newCollection(Collection collection) {
+    numElements = collection.size();
+    return newCollection();
+  }
+
+  /**
+   * Collection must have default constructor to be invoked by fury, otherwise 
created object can't
+   * be used to adding elements. For example:
+   *
+   * <pre>{@code new ArrayList<Integer> {add(1);}}</pre>
+   *
+   * <p>without default constructor, created list will have elementData as 
null, adding elements
+   * will raise NPE.
+   *
+   * @return empty collection instance
+   */
+  public Collection newCollection() {
+    if (constructor == null) {
+      constructor = ReflectionUtils.getCtrHandle(type, true);
+    }
+    try {
+      return (Collection) constructor.invoke();
+    } catch (Throwable e) {
+      // reduce code size of critical path.
+      throw buildException(e);
+    }
+  }
+
   private RuntimeException buildException(Throwable e) {
     return new IllegalArgumentException(
         "Please provide public no arguments constructor for class " + type, e);
@@ -514,8 +549,8 @@ public abstract class AbstractCollectionSerializer<T> 
extends Serializer<T> {
 
   /**
    * Get and reset numElements of deserializing collection. Should be called 
after {@link
-   * #newCollection}. Nested read may overwrite this element, reset is 
necessary to avoid use wrong
-   * value by mistake.
+   * #newCollection(MemoryBuffer buffer)}. Nested read may overwrite this 
element, reset is
+   * necessary to avoid use wrong value by mistake.
    */
   public int getAndClearNumElements() {
     int size = numElements;
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
index 9e979442..a488402c 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
@@ -77,6 +77,17 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     partialGenericKVTypeMap = new IdentityMap<>();
   }
 
+  public AbstractMapSerializer(
+      Fury fury, Class<T> cls, boolean supportCodegenHook, boolean immutable) {
+    super(fury, cls, immutable);
+    this.supportCodegenHook = supportCodegenHook;
+    keyClassInfoWriteCache = fury.getClassResolver().nilClassInfoHolder();
+    keyClassInfoReadCache = fury.getClassResolver().nilClassInfoHolder();
+    valueClassInfoWriteCache = fury.getClassResolver().nilClassInfoHolder();
+    valueClassInfoReadCache = fury.getClassResolver().nilClassInfoHolder();
+    partialGenericKVTypeMap = new IdentityMap<>();
+  }
+
   /**
    * Set key serializer for next serialization, the <code>serializer</code> 
will be cleared when
    * next serialization finished.
@@ -405,6 +416,12 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     return onMapRead(map);
   }
 
+  protected <K, V> void copyEntry(Map<K, V> originMap, Map<K, V> newMap) {
+    for (Map.Entry<K, V> entry : originMap.entrySet()) {
+      newMap.put(fury.copyObject(entry.getKey()), 
fury.copyObject(entry.getValue()));
+    }
+  }
+
   @SuppressWarnings("unchecked")
   protected final void readElements(MemoryBuffer buffer, int size, Map map) {
     Serializer keySerializer = this.keySerializer;
@@ -718,9 +735,38 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
+  public Map newMap(Map map) {
+    numElements = map.size();
+    return newMap();
+  }
+
+  /**
+   * Map must have default constructor to be invoked by fury, otherwise 
created object can't be used
+   * to adding elements. For example:
+   *
+   * <pre>{@code new ArrayList<Integer> {add(1);}}</pre>
+   *
+   * <p>without default constructor, created list will have elementData as 
null, adding elements
+   * will raise NPE.
+   *
+   * @return empty map instance
+   */
+  public Map newMap() {
+    if (constructor == null) {
+      constructor = ReflectionUtils.getCtrHandle(type, true);
+    }
+    try {
+      return (Map) constructor.invoke();
+    } catch (Throwable e) {
+      throw new IllegalArgumentException(
+          "Please provide public no arguments constructor for class " + type, 
e);
+    }
+  }
+
   /**
-   * Get and reset numElements of deserializing collection. Should be called 
after {@link #newMap}.
-   * Nested read may overwrite this element, reset is necessary to avoid use 
wrong value by mistake.
+   * Get and reset numElements of deserializing collection. Should be called 
after {@link
+   * #newMap(MemoryBuffer buffer)}. Nested read may overwrite this element, 
reset is necessary to
+   * avoid use wrong value by mistake.
    */
   public int getAndClearNumElements() {
     int size = numElements;
@@ -732,6 +778,8 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     this.numElements = numElements;
   }
 
+  public abstract T onMapCopy(Map map);
+
   public abstract T onMapRead(Map map);
 
   private Object readJavaRefOptimized(
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java
index 511a5d91..0b575e51 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java
@@ -34,6 +34,11 @@ public class CollectionSerializer<T extends Collection> 
extends AbstractCollecti
     super(fury, type, supportCodegenHook);
   }
 
+  public CollectionSerializer(
+      Fury fury, Class<T> type, boolean supportCodegenHook, boolean immutable) 
{
+    super(fury, type, supportCodegenHook, immutable);
+  }
+
   @Override
   public Collection onCollectionWrite(MemoryBuffer buffer, T value) {
     buffer.writeVarUint32Small7(value.size());
@@ -45,6 +50,29 @@ public class CollectionSerializer<T extends Collection> 
extends AbstractCollecti
     return (T) collection;
   }
 
+  @Override
+  public T copy(T originCollection) {
+    if (isImmutable()) {
+      return originCollection;
+    }
+    Collection newCollection = newCollection(originCollection);
+    if (needToCopyRef) {
+      Collection copyObject = (Collection) 
fury.getCopyObject(originCollection);
+      if (copyObject != null) {
+        return (T) copyObject;
+      }
+      fury.reference(originCollection, newCollection);
+    }
+    copyElements(originCollection, newCollection);
+    return (T) newCollection;
+  }
+
+  public void copyElements(T originCollection, Collection newCollection) {
+    for (Object element : originCollection) {
+      newCollection.add(fury.copyObject(element));
+    }
+  }
+
   @Override
   public T read(MemoryBuffer buffer) {
     Collection collection = newCollection(buffer);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java
index 3332b0c0..8616a39e 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java
@@ -35,6 +35,7 @@ import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedSet;
@@ -101,6 +102,28 @@ public class CollectionSerializers {
       super(fury, cls, false);
     }
 
+    @Override
+    public List<?> copy(List<?> originCollection) {
+      Object[] elements = new Object[originCollection.size()];
+      List<?> newCollection = Arrays.asList(elements);
+      if (needToCopyRef) {
+        List<?> copyObject = (List<?>) fury.getCopyObject(originCollection);
+        if (copyObject != null) {
+          return copyObject;
+        }
+        fury.reference(originCollection, newCollection);
+      }
+      copyElements(originCollection, elements);
+      return newCollection;
+    }
+
+    private void copyElements(List<?> originCollection, Object[] elements) {
+      int size = originCollection.size();
+      for (int i = 0; i < size; i++) {
+        elements[i] = fury.copyObject(originCollection.get(i));
+      }
+    }
+
     @Override
     public short getXtypeId() {
       return (short) -Type.LIST.getId();
@@ -221,6 +244,22 @@ public class CollectionSerializers {
       fury.getRefResolver().reference(collection);
       return collection;
     }
+
+    @Override
+    public Collection newCollection(Collection originCollection) {
+      Collection collection;
+      Comparator comparator = fury.copyObject(((SortedSet) 
originCollection).comparator());
+      if (Objects.equals(type, TreeSet.class)) {
+        collection = new TreeSet(comparator);
+      } else {
+        try {
+          collection = (T) constructor.invoke(comparator);
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return collection;
+    }
   }
 
   // ------------------------------ collections serializers 
------------------------------ //
@@ -231,7 +270,7 @@ public class CollectionSerializers {
   public static final class EmptyListSerializer extends 
CollectionSerializer<List<?>> {
 
     public EmptyListSerializer(Fury fury, Class<List<?>> cls) {
-      super(fury, cls, false);
+      super(fury, cls, false, true);
     }
 
     @Override
@@ -284,7 +323,7 @@ public class CollectionSerializers {
   public static final class EmptySetSerializer extends 
CollectionSerializer<Set<?>> {
 
     public EmptySetSerializer(Fury fury, Class<Set<?>> cls) {
-      super(fury, cls, false);
+      super(fury, cls, false, true);
     }
 
     @Override
@@ -316,7 +355,7 @@ public class CollectionSerializers {
   public static final class EmptySortedSetSerializer extends 
CollectionSerializer<SortedSet<?>> {
 
     public EmptySortedSetSerializer(Fury fury, Class<SortedSet<?>> cls) {
-      super(fury, cls, false);
+      super(fury, cls, false, true);
     }
 
     @Override
@@ -335,6 +374,11 @@ public class CollectionSerializers {
       super(fury, cls, false);
     }
 
+    @Override
+    public List<?> copy(List<?> originCollection) {
+      return 
Collections.singletonList(fury.copyObject(originCollection.get(0)));
+    }
+
     @Override
     public void write(MemoryBuffer buffer, List<?> value) {
       fury.writeRef(buffer, value.get(0));
@@ -369,6 +413,11 @@ public class CollectionSerializers {
       super(fury, cls, false);
     }
 
+    @Override
+    public Set<?> copy(Set<?> originCollection) {
+      return 
Collections.singleton(fury.copyObject(originCollection.iterator().next()));
+    }
+
     @Override
     public void write(MemoryBuffer buffer, Set<?> value) {
       fury.writeRef(buffer, value.iterator().next());
@@ -445,6 +494,16 @@ public class CollectionSerializers {
       return set;
     }
 
+    @Override
+    public Collection newCollection(Collection originCollection) {
+      Map<?, Boolean> map =
+          (Map<?, Boolean>) Platform.getObject(originCollection, 
MAP_FIELD_OFFSET);
+      AbstractMapSerializer mapSerializer =
+          (AbstractMapSerializer) 
fury.getClassResolver().getSerializer(map.getClass());
+      Map newMap = mapSerializer.newMap();
+      return Collections.newSetFromMap(newMap);
+    }
+
     @Override
     public Collection onCollectionWrite(MemoryBuffer buffer, Set<?> value) {
       final Map<?, Boolean> map = (Map<?, Boolean>) Platform.getObject(value, 
MAP_FIELD_OFFSET);
@@ -471,6 +530,11 @@ public class CollectionSerializers {
       fury.getRefResolver().reference(keySetView);
       return keySetView;
     }
+
+    @Override
+    public Collection newCollection(Collection collection) {
+      return ConcurrentHashMap.newKeySet(collection.size());
+    }
   }
 
   public static final class VectorSerializer extends 
CollectionSerializer<Vector> {
@@ -543,6 +607,11 @@ public class CollectionSerializers {
       }
       return object;
     }
+
+    @Override
+    public EnumSet copy(EnumSet originCollection) {
+      return EnumSet.copyOf(originCollection);
+    }
   }
 
   public static class BitSetSerializer extends Serializer<BitSet> {
@@ -557,6 +626,11 @@ public class CollectionSerializers {
           values, Platform.LONG_ARRAY_OFFSET, 
Math.multiplyExact(values.length, 8));
     }
 
+    @Override
+    public BitSet copy(BitSet originCollection) {
+      return BitSet.valueOf(originCollection.toLongArray());
+    }
+
     @Override
     public BitSet read(MemoryBuffer buffer) {
       long[] values = buffer.readLongs(buffer.readVarUint32Small7());
@@ -575,6 +649,12 @@ public class CollectionSerializers {
       return value;
     }
 
+    @Override
+    public Collection newCollection(Collection collection) {
+      return new PriorityQueue(
+          collection.size(), fury.copyObject(((PriorityQueue) 
collection).comparator()));
+    }
+
     @Override
     public PriorityQueue newCollection(MemoryBuffer buffer) {
       int numElements = buffer.readVarUint32Small7();
@@ -625,6 +705,11 @@ public class CollectionSerializers {
       dataSerializer.write(buffer, value);
     }
 
+    @Override
+    public T copy(T originCollection) {
+      return dataSerializer.copy(originCollection);
+    }
+
     @Override
     public T read(MemoryBuffer buffer) {
       return dataSerializer.read(buffer);
@@ -667,6 +752,11 @@ public class CollectionSerializers {
     public void write(MemoryBuffer buffer, T value) {
       serializer.write(buffer, value);
     }
+
+    @Override
+    public T copy(T value) {
+      return (T) serializer.copy(value);
+    }
   }
 
   // TODO add 
JDK11:JdkImmutableListSerializer,JdkImmutableMapSerializer,JdkImmutableSetSerializer
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/GuavaCollectionSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/GuavaCollectionSerializers.java
index 44aa025d..59a9e832 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/GuavaCollectionSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/GuavaCollectionSerializers.java
@@ -221,6 +221,16 @@ public class GuavaCollectionSerializers {
       return new MapContainer(numElements);
     }
 
+    @Override
+    public Map newMap(Map map) {
+      return new MapContainer(map.size());
+    }
+
+    @Override
+    public T onMapCopy(Map map) {
+      return onMapRead(map);
+    }
+
     @Override
     public T onMapRead(Map map) {
       MapContainer container = (MapContainer) map;
@@ -341,6 +351,17 @@ public class GuavaCollectionSerializers {
       return new SortedMapContainer<>(comparator, numElements);
     }
 
+    @Override
+    public Map newMap(Map map) {
+      Comparator comparator = fury.copyObject(((ImmutableSortedMap) 
map).comparator());
+      return new SortedMapContainer<>(comparator, map.size());
+    }
+
+    @Override
+    public T onMapCopy(Map map) {
+      return onMapRead(map);
+    }
+
     @Override
     public T onMapRead(Map map) {
       SortedMapContainer mapContainer = (SortedMapContainer) map;
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/ImmutableCollectionSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/ImmutableCollectionSerializers.java
index 3667b8cc..4a69a25c 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/ImmutableCollectionSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/ImmutableCollectionSerializers.java
@@ -182,6 +182,21 @@ public class ImmutableCollectionSerializers {
       }
     }
 
+    @Override
+    public Map newMap(Map map) {
+      int numElements = map.size();
+      if (Platform.JAVA_VERSION > 8) {
+        return new JDKImmutableMapContainer(numElements);
+      } else {
+        return new HashMap(numElements);
+      }
+    }
+
+    @Override
+    public Map onMapCopy(Map map) {
+      return onMapRead(map);
+    }
+
     @Override
     public Map onMapRead(Map map) {
       if (Platform.JAVA_VERSION > 8) {
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java
index 0543d5f4..20af82ab 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java
@@ -34,12 +34,38 @@ public class MapSerializer<T extends Map> extends 
AbstractMapSerializer<T> {
     super(fury, cls, supportCodegenHook);
   }
 
+  public MapSerializer(Fury fury, Class<T> cls, boolean supportCodegenHook, 
boolean immutable) {
+    super(fury, cls, supportCodegenHook, immutable);
+  }
+
   @Override
   public Map onMapWrite(MemoryBuffer buffer, T value) {
     buffer.writeVarUint32Small7(value.size());
     return value;
   }
 
+  @Override
+  public T copy(T originMap) {
+    if (isImmutable()) {
+      return originMap;
+    }
+    Map newMap = newMap(originMap);
+    if (needToCopyRef) {
+      Map copyMap = (Map) fury.getCopyObject(originMap);
+      if (copyMap != null) {
+        return (T) copyMap;
+      }
+      fury.reference(originMap, newMap);
+    }
+    copyEntry(originMap, newMap);
+    return onMapCopy(newMap);
+  }
+
+  @Override
+  public T onMapCopy(Map map) {
+    return (T) map;
+  }
+
   @Override
   public T read(MemoryBuffer buffer) {
     Map map = newMap(buffer);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java
index 94c142b3..bf4f3af6 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java
@@ -26,6 +26,7 @@ import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +70,11 @@ public class MapSerializers {
       fury.getRefResolver().reference(hashMap);
       return hashMap;
     }
+
+    @Override
+    public Map newMap(Map map) {
+      return new HashMap(map.size());
+    }
   }
 
   public static final class LinkedHashMapSerializer extends 
MapSerializer<LinkedHashMap> {
@@ -89,6 +95,11 @@ public class MapSerializers {
       fury.getRefResolver().reference(hashMap);
       return hashMap;
     }
+
+    @Override
+    public Map newMap(Map map) {
+      return new LinkedHashMap(map.size());
+    }
   }
 
   public static final class LazyMapSerializer extends MapSerializer<LazyMap> {
@@ -109,6 +120,11 @@ public class MapSerializers {
       fury.getRefResolver().reference(map);
       return map;
     }
+
+    @Override
+    public Map newMap(Map map) {
+      return new LazyMap(map.size());
+    }
   }
 
   public static class SortedMapSerializer<T extends SortedMap> extends 
MapSerializer<T> {
@@ -145,12 +161,28 @@ public class MapSerializers {
       fury.getRefResolver().reference(map);
       return map;
     }
+
+    @Override
+    public Map newMap(Map originMap) {
+      Comparator comparator = fury.copyObject(((SortedMap) 
originMap).comparator());
+      Map map;
+      if (type == TreeMap.class) {
+        map = new TreeMap(comparator);
+      } else {
+        try {
+          map = (Map) constructor.invoke(comparator);
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return map;
+    }
   }
 
   public static final class EmptyMapSerializer extends MapSerializer<Map<?, 
?>> {
 
     public EmptyMapSerializer(Fury fury, Class<Map<?, ?>> cls) {
-      super(fury, cls, false);
+      super(fury, cls, false, true);
     }
 
     @Override
@@ -181,7 +213,7 @@ public class MapSerializers {
 
   public static final class EmptySortedMapSerializer extends 
MapSerializer<SortedMap<?, ?>> {
     public EmptySortedMapSerializer(Fury fury, Class<SortedMap<?, ?>> cls) {
-      super(fury, cls, false);
+      super(fury, cls, false, true);
     }
 
     @Override
@@ -199,6 +231,13 @@ public class MapSerializers {
       super(fury, cls, false);
     }
 
+    @Override
+    public Map<?, ?> copy(Map<?, ?> originMap) {
+      Entry<?, ?> entry = originMap.entrySet().iterator().next();
+      return Collections.singletonMap(
+          fury.copyObject(entry.getKey()), fury.copyObject(entry.getValue()));
+    }
+
     @Override
     public void write(MemoryBuffer buffer, Map<?, ?> value) {
       Map.Entry entry = value.entrySet().iterator().next();
@@ -250,6 +289,11 @@ public class MapSerializers {
       return map;
     }
 
+    @Override
+    public Map newMap(Map map) {
+      return new ConcurrentHashMap(map.size());
+    }
+
     @Override
     public short getXtypeId() {
       return Fury.NOT_SUPPORT_CROSS_LANGUAGE;
@@ -273,6 +317,12 @@ public class MapSerializers {
       return map;
     }
 
+    @Override
+    public Map newMap(Map originMap) {
+      Comparator comparator = fury.copyObject(((ConcurrentSkipListMap) 
originMap).comparator());
+      return new ConcurrentSkipListMap(comparator);
+    }
+
     @Override
     public short getXtypeId() {
       return Fury.NOT_SUPPORT_CROSS_LANGUAGE;
@@ -311,6 +361,11 @@ public class MapSerializers {
       Class<?> keyType = 
fury.getClassResolver().readClassInfo(buffer).getCls();
       return new EnumMap(keyType);
     }
+
+    @Override
+    public EnumMap copy(EnumMap originMap) {
+      return new EnumMap(originMap);
+    }
   }
 
   public static class StringKeyMapSerializer<T> extends 
MapSerializer<Map<String, T>> {
@@ -340,6 +395,13 @@ public class MapSerializers {
       }
       return (Map<String, T>) map;
     }
+
+    @Override
+    protected <K, V> void copyEntry(Map<K, V> originMap, Map<K, V> newMap) {
+      for (Entry<K, V> entry : originMap.entrySet()) {
+        newMap.put(entry.getKey(), fury.copyObject(entry.getValue()));
+      }
+    }
   }
 
   /**
@@ -370,6 +432,11 @@ public class MapSerializers {
       throw new IllegalStateException();
     }
 
+    @Override
+    public T onMapCopy(Map map) {
+      throw new IllegalStateException();
+    }
+
     @Override
     public T onMapRead(Map map) {
       throw new IllegalStateException();
@@ -380,6 +447,11 @@ public class MapSerializers {
       dataSerializer.write(buffer, value);
     }
 
+    @Override
+    public T copy(T value) {
+      return dataSerializer.copy(value);
+    }
+
     @Override
     public T read(MemoryBuffer buffer) {
       return dataSerializer.read(buffer);
@@ -406,6 +478,11 @@ public class MapSerializers {
       throw new IllegalStateException();
     }
 
+    @Override
+    public T onMapCopy(Map map) {
+      throw new IllegalStateException();
+    }
+
     @Override
     public T onMapRead(Map map) {
       throw new IllegalStateException();
@@ -421,6 +498,11 @@ public class MapSerializers {
     public void write(MemoryBuffer buffer, T value) {
       serializer.write(buffer, value);
     }
+
+    @Override
+    public T copy(T value) {
+      return (T) serializer.copy(value);
+    }
   }
 
   // TODO(chaokunyang) support ConcurrentSkipListMap.SubMap mo efficiently.
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java
index 82d5bc4e..6d79b5f0 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java
@@ -121,6 +121,12 @@ public class SynchronizedSerializers {
       }
     }
 
+    @Override
+    public Map copy(Map originMap) {
+      final Object unwrappedMap = Platform.getObject(originMap, offset);
+      return (Map) factory.apply(fury.copyObject(unwrappedMap));
+    }
+
     @Override
     public Map read(MemoryBuffer buffer) {
       final Object sourceCollection = fury.readRef(buffer);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java
index 3f54860a..d6d0c52c 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java
@@ -118,6 +118,12 @@ public class UnmodifiableSerializers {
       fury.writeRef(buffer, fieldValue);
     }
 
+    @Override
+    public Map copy(Map originMap) {
+      final Object unwrappedMap = Platform.getObject(originMap, offset);
+      return (Map) factory.apply(fury.copyObject(unwrappedMap));
+    }
+
     @Override
     public Map read(MemoryBuffer buffer) {
       final Object sourceCollection = fury.readRef(buffer);
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/scala/SingletonMapSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/scala/SingletonMapSerializer.java
index 50be016f..4ad3794c 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/scala/SingletonMapSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/scala/SingletonMapSerializer.java
@@ -67,6 +67,11 @@ public class SingletonMapSerializer extends 
AbstractMapSerializer {
     return Platform.getObject(base, offset);
   }
 
+  @Override
+  public Object onMapCopy(Map map) {
+    throw new IllegalStateException("unreachable");
+  }
+
   @Override
   public Object onMapRead(Map map) {
     throw new IllegalStateException("unreachable");
diff --git a/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java 
b/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java
new file mode 100644
index 00000000..b86fe07a
--- /dev/null
+++ b/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury;
+
+import com.google.common.collect.ImmutableList;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.MonthDay;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.time.Period;
+import java.time.Year;
+import java.time.YearMonth;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Currency;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.OptionalDouble;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import org.apache.fury.collection.LazyMap;
+import org.apache.fury.serializer.EnumSerializerTest;
+import org.apache.fury.serializer.EnumSerializerTest.EnumFoo;
+import 
org.apache.fury.serializer.collection.ChildContainerSerializersTest.ChildArrayDeque;
+import org.apache.fury.test.bean.BeanA;
+import org.apache.fury.test.bean.BeanB;
+import org.apache.fury.test.bean.Cyclic;
+import org.apache.fury.util.DateTimeUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FuryCopyTest extends FuryTestBase {
+
+  private final Fury fury = 
builder().withCopyRefTracking(true).withCodegen(false).build();
+
+  @Test
+  public void immutableObjectCopyTest() {
+    primitiveCopyTest();
+    assertSame("12ca213@!.3");
+    assertSame(EnumSerializerTest.EnumFoo.A);
+    assertSame(EnumSerializerTest.EnumFoo.B);
+    assertSame(BigInteger.valueOf(100));
+    assertSame(BigDecimal.valueOf(100, 2));
+    timeCopyTest();
+    assertSame(Charset.defaultCharset());
+    assertSame(Object.class);
+    assertSame(Currency.getInstance(Locale.CHINA));
+    assertSame(new Object());
+    assertSame(Locale.US);
+    assertSame(OptionalInt.of(Integer.MAX_VALUE));
+    assertSame(OptionalDouble.of(Double.MAX_VALUE));
+    assertSame(OptionalLong.of(Long.MAX_VALUE));
+    assertSame(Pattern.compile(""));
+    assertSame(URI.create("test"));
+    assertSame(new UUID(System.currentTimeMillis(), 
System.currentTimeMillis()));
+    // URL is not handle with URLSerializer. use replaceResolveSerializer
+    // assertSame(new URL("https://www.baidu.com";));
+
+    assertSame(Collections.EMPTY_LIST);
+    assertSame(Collections.EMPTY_MAP);
+    assertSame(Collections.EMPTY_SET);
+    assertSame(Collections.emptySortedSet());
+    assertSame(Collections.emptySortedMap());
+  }
+
+  @Test
+  public void mutableObjectCopyTest() {
+    collectionCopyTest();
+    mapCopyTest();
+    objectCopyTest();
+  }
+
+  @Test
+  public void threadLocalCopyTest() {
+    BeanA beanA = BeanA.createBeanA(2);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    AtomicReference<Throwable> ex = new AtomicReference<>();
+    ThreadLocalFury threadLocalFury =
+        
builder().withCodegen(false).withCopyRefTracking(true).buildThreadLocalFury();
+    threadLocalFury.register(BeanA.class);
+    assetEqualsButNotSame(threadLocalFury.copy(beanA));
+    executor.execute(
+        () -> {
+          try {
+            assetEqualsButNotSame(threadLocalFury.copy(beanA));
+          } catch (Throwable t) {
+            ex.set(t);
+          }
+        });
+    Assert.assertNull(ex.get());
+  }
+
+  @Test
+  public void threadpoolCopyTest() throws InterruptedException {
+    BeanA beanA = BeanA.createBeanA(2);
+    AtomicBoolean flag = new AtomicBoolean(false);
+    ThreadSafeFury threadSafeFury =
+        builder()
+            .withCopyRefTracking(true)
+            .withCodegen(false)
+            .withAsyncCompilation(true)
+            .buildThreadSafeFuryPool(5, 10);
+    for (int i = 0; i < 2000; i++) {
+      new Thread(
+              () -> {
+                for (int j = 0; j < 10; j++) {
+                  try {
+                    
threadSafeFury.setClassLoader(beanA.getClass().getClassLoader());
+                    Assert.assertEquals(beanA, threadSafeFury.copy(beanA));
+                  } catch (Exception e) {
+                    e.printStackTrace();
+                    flag.set(true);
+                  }
+                }
+              })
+          .start();
+    }
+    TimeUnit.SECONDS.sleep(5);
+    Assert.assertFalse(flag.get());
+  }
+
+  private void objectCopyTest() {
+    for (int i = 1; i <= 10; i++) {
+      BeanA beanA = BeanA.createBeanA(i);
+      BeanB beanB = BeanB.createBeanB(i);
+      assetEqualsButNotSame(beanA);
+      assetEqualsButNotSame(beanB);
+    }
+
+    Cyclic cyclic = Cyclic.create(true);
+    assetEqualsButNotSame(cyclic);
+  }
+
+  private void mapCopyTest() {
+    Map<String, Integer> map = new HashMap<>();
+    map.put("a", 1);
+    map.put("b", 2);
+    TreeMap<String, Integer> treeMap = new 
TreeMap<>(Comparator.naturalOrder());
+    treeMap.putAll(map);
+    EnumMap<EnumFoo, Object> enumMap = new EnumMap<>(EnumFoo.class);
+    enumMap.put(EnumFoo.A, 1);
+    LazyMap<String, Integer> lazyMap = new LazyMap<>();
+    lazyMap.put("a", 1);
+
+    assetEqualsButNotSame(new HashMap<>(map));
+    assetEqualsButNotSame(new LinkedHashMap<>(map));
+    assetEqualsButNotSame(treeMap);
+    assetEqualsButNotSame(Collections.singletonMap("a", 1));
+    assetEqualsButNotSame(new ConcurrentHashMap<>(map));
+    assetEqualsButNotSame(new ConcurrentSkipListMap<>(map));
+    assetEqualsButNotSame(enumMap);
+    assetEqualsButNotSame(lazyMap);
+  }
+
+  private void collectionCopyTest() {
+    List<String> testData = Arrays.asList("1", "2", "3");
+    TreeSet<String> treeSet = new TreeSet<>(Comparator.naturalOrder());
+    treeSet.addAll(testData);
+    assetEqualsButNotSame(new ArrayList<>(testData));
+    assetEqualsButNotSame(testData);
+    assetEqualsButNotSame(new LinkedList<>(testData));
+    assetEqualsButNotSame(new HashSet<>(testData));
+    assetEqualsButNotSame(new LinkedHashSet<>(testData));
+    assetEqualsButNotSame(treeSet);
+    assetEqualsButNotSame(new CopyOnWriteArrayList<>(testData));
+    assetEqualsButNotSame(new ConcurrentSkipListSet<>(testData));
+    assetEqualsButNotSame(Collections.newSetFromMap(new HashMap<>()));
+    assetEqualsButNotSame(ConcurrentHashMap.newKeySet(10));
+    assetEqualsButNotSame(new Vector<>(testData));
+    assetEqualsButNotSame(EnumSet.of(EnumSerializerTest.EnumFoo.A, 
EnumSerializerTest.EnumFoo.B));
+    assetEqualsButNotSame(BitSet.valueOf(new byte[] {1, 2, 3}));
+    assetEqualsButNotSame(Collections.singleton(1));
+    assetEqualsButNotSame(Collections.singletonList(1));
+
+    ImmutableList<String> data = ImmutableList.copyOf(testData);
+    ChildArrayDeque<String> list = new ChildArrayDeque<>();
+    list.addAll(data);
+    Assert.assertEquals(data, ImmutableList.copyOf(fury.copy(list)));
+    Assert.assertEquals(data, ImmutableList.copyOf(new PriorityQueue<>(data)));
+  }
+
+  private void primitiveCopyTest() {
+    assertSame(Boolean.TRUE);
+    assertSame(Byte.MAX_VALUE);
+    assertSame(Short.MAX_VALUE);
+    assertSame(Character.MAX_VALUE);
+    assertSame(Integer.MAX_VALUE);
+    assertSame(Long.MAX_VALUE);
+    assertSame(Float.MAX_VALUE);
+    assertSame(Double.MAX_VALUE);
+  }
+
+  private void timeCopyTest() {
+    assertSame(new java.util.Date());
+    assertSame(new java.sql.Date(System.currentTimeMillis()));
+    assertSame(new Time(System.currentTimeMillis()));
+    assertSame(new Timestamp(System.currentTimeMillis()));
+    assertSame(LocalDate.now());
+    assertSame(LocalTime.now());
+    assertSame(LocalDateTime.now());
+    assertSame(DateTimeUtils.truncateInstantToMicros(Instant.now()));
+    assertSame(Duration.ofDays(10));
+    assertSame(ZoneOffset.MAX);
+    assertSame(ZonedDateTime.now());
+    assertSame(Year.now());
+    assertSame(YearMonth.now());
+    assertSame(MonthDay.now());
+    assertSame(Period.ofDays(10));
+    assertSame(OffsetTime.now());
+    assertSame(OffsetDateTime.now());
+  }
+
+  private void assertSame(Object obj) {
+    Assert.assertSame(obj, fury.copy(obj));
+  }
+
+  private void assetEqualsButNotSame(Object obj) {
+    Object newObj = fury.copy(obj);
+    Assert.assertEquals(obj, newObj);
+    Assert.assertNotSame(obj, newObj);
+  }
+
+  public static class A {
+    private String name;
+    private B b;
+    private List<Integer> list = new ArrayList<>();
+
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public B getB() {
+      return b;
+    }
+
+    public void setB(B b) {
+      this.b = b;
+    }
+
+    public List<Integer> getList() {
+      return list;
+    }
+
+    public void setList(List<Integer> list) {
+      this.list = list;
+    }
+
+    @Override
+    public String toString() {
+      return "A{" + "name='" + name + '\'' + ", list=" + list + '}';
+    }
+  }
+
+  public static class B {
+    private String name;
+    private A a;
+    private List<Integer> list = new ArrayList<>();
+
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public A getA() {
+      return a;
+    }
+
+    public void setA(A a) {
+      this.a = a;
+    }
+
+    public List<Integer> getList() {
+      return list;
+    }
+
+    public void setList(List<Integer> list) {
+      this.list = list;
+    }
+
+    @Override
+    public String toString() {
+      return "B{" + "name='" + name + '\'' + ", list=" + list + '}';
+    }
+  }
+}
diff --git 
a/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala 
b/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala
index df37b90d..4f681193 100644
--- a/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala
+++ b/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala
@@ -64,6 +64,10 @@ abstract class AbstractScalaMapSerializer[K, V, T](fury: 
Fury, cls: Class[T])
     new MapBuilder[K, V, T](builder)
   }
 
+  override def onMapCopy(map: util.Map[_, _]): T = {
+    map.asInstanceOf[MapBuilder[K, V, T]].builder.result()
+  }
+
   override def onMapRead(map: util.Map[_, _]): T = {
     map.asInstanceOf[MapBuilder[K, V, T]].builder.result()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to