This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch releases-0.10
in repository https://gitbox.apache.org/repos/asf/fury.git

commit 2dc711008d855ccf923a85cd9bdd84d91c592174
Author: Shawn Yang <[email protected]>
AuthorDate: Sun Jan 26 18:39:48 2025 +0800

    feat(java): new implementation and protocol refine for chunk based map 
serialization (#2025)
    
    This pr provides a new implementation for chunk based map serialization.
    
    Closes #925
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/fury/issues/new/choose) describing the
    need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    Deserialization are much faster than no-chunk version, serialization are
    faster if map size are bigger
    
    Using the benchmark code in
    https://github.com/apache/fury/pull/1722#issuecomment-2342512316:
    
    This PR has run faster, it gets up to **3x faster** :
    ```java
    Benchmark                                  (size)  (tracking)  Mode  Cnt    
 Score       Error  Units
    HnBenchmark.testGeneralChunkWriteWithNull      64        true  avgt    3   
965.521 ±  1830.936  ns/op
    HnBenchmark.testGeneralChunkWriteWithNull      64       false  avgt    3  
1060.411 ±  3424.719  ns/op
    HnBenchmark.testGeneralChunkWriteWithNull     128        true  avgt    3  
2404.445 ±  8687.122  ns/op
    HnBenchmark.testGeneralChunkWriteWithNull     128       false  avgt    3  
1814.507 ±  1722.751  ns/op
    HnBenchmark.testGeneralChunkWriteWithNull     256        true  avgt    3  
3944.632 ±  2203.076  ns/op
    HnBenchmark.testGeneralChunkWriteWithNull     256       false  avgt    3  
3288.805 ±   867.047  ns/op
    HnBenchmark.testGeneralWriteWithNull           64        true  avgt    3  
1962.688 ±  2828.210  ns/op
    HnBenchmark.testGeneralWriteWithNull           64       false  avgt    3  
1490.634 ±   962.836  ns/op
    HnBenchmark.testGeneralWriteWithNull          128        true  avgt    3  
3659.806 ±  7227.436  ns/op
    HnBenchmark.testGeneralWriteWithNull          128       false  avgt    3  
4084.654 ±  7374.774  ns/op
    HnBenchmark.testGeneralWriteWithNull          256        true  avgt    3  
9596.658 ± 20767.262  ns/op
    HnBenchmark.testGeneralWriteWithNull          256       false  avgt    3  
6679.325 ±  5472.179  ns/op
    ```
    
    With StringMap and IntMap benchmark:
    ```java
    
    Benchmark                                   (enableChunkEncoding)  
(mapSize)   Mode  Cnt        Score          Error  Units
    MapSerializationSuite.deserializeIntMap                     false          
5  thrpt    3  3804604.842 ± 15328547.705  ops/s
    MapSerializationSuite.deserializeIntMap                     false         
20  thrpt    3  1254687.969 ±   388949.724  ops/s
    MapSerializationSuite.deserializeIntMap                     false         
50  thrpt    3   495176.849 ±   335702.097  ops/s
    MapSerializationSuite.deserializeIntMap                     false        
100  thrpt    3   258875.012 ±    32886.176  ops/s
    MapSerializationSuite.deserializeIntMap                     false        
200  thrpt    3   134137.015 ±   114908.454  ops/s
    MapSerializationSuite.deserializeIntMap                      true          
5  thrpt    3  5997383.562 ±  4598913.048  ops/s
    MapSerializationSuite.deserializeIntMap                      true         
20  thrpt    3  1797855.524 ±  3853406.173  ops/s
    MapSerializationSuite.deserializeIntMap                      true         
50  thrpt    3   582412.110 ±  1047668.070  ops/s
    MapSerializationSuite.deserializeIntMap                      true        
100  thrpt    3   389066.866 ±   151297.708  ops/s
    MapSerializationSuite.deserializeIntMap                      true        
200  thrpt    3   188316.860 ±    35331.909  ops/s
    MapSerializationSuite.deserializeStringMap                  false          
5  thrpt    3  2898963.533 ±  1930240.310  ops/s
    MapSerializationSuite.deserializeStringMap                  false         
20  thrpt    3   872196.086 ±   871637.268  ops/s
    MapSerializationSuite.deserializeStringMap                  false         
50  thrpt    3   308761.737 ±    58099.196  ops/s
    MapSerializationSuite.deserializeStringMap                  false        
100  thrpt    3   157261.914 ±   397356.241  ops/s
    MapSerializationSuite.deserializeStringMap                  false        
200  thrpt    3    86576.549 ±   102489.156  ops/s
    MapSerializationSuite.deserializeStringMap                   true          
5  thrpt    3  3701089.567 ±  1529899.331  ops/s
    MapSerializationSuite.deserializeStringMap                   true         
20  thrpt    3  1048550.399 ±   130102.760  ops/s
    MapSerializationSuite.deserializeStringMap                   true         
50  thrpt    3   407559.246 ±    38205.273  ops/s
    MapSerializationSuite.deserializeStringMap                   true        
100  thrpt    3   172109.437 ±   397927.346  ops/s
    MapSerializationSuite.deserializeStringMap                   true        
200  thrpt    3    92525.977 ±   379321.772  ops/s
    MapSerializationSuite.serializeIntMap                       false          
5  thrpt    3  7958692.983 ±  1934287.574  ops/s
    MapSerializationSuite.serializeIntMap                       false         
20  thrpt    3  2425269.897 ±  3763706.776  ops/s
    MapSerializationSuite.serializeIntMap                       false         
50  thrpt    3  1079804.122 ±   215967.411  ops/s
    MapSerializationSuite.serializeIntMap                       false        
100  thrpt    3   369848.671 ±   433172.821  ops/s
    MapSerializationSuite.serializeIntMap                       false        
200  thrpt    3   192858.945 ±    71543.709  ops/s
    MapSerializationSuite.serializeIntMap                        true          
5  thrpt    3  7239453.648 ±  3855324.170  ops/s
    MapSerializationSuite.serializeIntMap                        true         
20  thrpt    3  2137006.685 ±  3823762.656  ops/s
    MapSerializationSuite.serializeIntMap                        true         
50  thrpt    3   811639.511 ±  2407986.801  ops/s
    MapSerializationSuite.serializeIntMap                        true        
100  thrpt    3   412728.569 ±   149199.142  ops/s
    MapSerializationSuite.serializeIntMap                        true        
200  thrpt    3   236602.475 ±   253662.098  ops/s
    MapSerializationSuite.serializeStringMap                    false          
5  thrpt    3  5821603.026 ±  1397740.496  ops/s
    MapSerializationSuite.serializeStringMap                    false         
20  thrpt    3  1712819.341 ±   321017.433  ops/s
    MapSerializationSuite.serializeStringMap                    false         
50  thrpt    3   615260.241 ±   806075.165  ops/s
    MapSerializationSuite.serializeStringMap                    false        
100  thrpt    3   265117.558 ±   146904.745  ops/s
    MapSerializationSuite.serializeStringMap                    false        
200  thrpt    3   128618.697 ±    94723.953  ops/s
    MapSerializationSuite.serializeStringMap                     true          
5  thrpt    3  4503474.325 ± 11254674.336  ops/s
    MapSerializationSuite.serializeStringMap                     true         
20  thrpt    3  1732501.942 ±   373691.778  ops/s
    MapSerializationSuite.serializeStringMap                     true         
50  thrpt    3   596678.154 ±   173893.988  ops/s
    MapSerializationSuite.serializeStringMap                     true        
100  thrpt    3   336814.584 ±   134582.563  ops/s
    MapSerializationSuite.serializeStringMap                     true        
200  thrpt    3   143124.619 ±   200889.695  ops/s
    ```
    
    
![image](https://github.com/user-attachments/assets/a76301ea-0230-495e-afe9-9d1acea6263d)
    
    
![image](https://github.com/user-attachments/assets/2e17d0d0-6851-467d-822b-5a9024252bc4)
    
    
![image](https://github.com/user-attachments/assets/b60cdf7f-0bca-4654-a46c-c13d9443661c)
    
    
![image](https://github.com/user-attachments/assets/24637c8c-adb4-4de2-a349-df7356b7fdba)
---
 docs/specification/java_serialization_spec.md      |   14 +-
 docs/specification/xlang_serialization_spec.md     |   14 +-
 .../fury/benchmark/MapSerializationSuite.java      |  101 +
 .../java/org/apache/fury/memory/MemoryBuffer.java  |   11 +
 .../collection/AbstractMapSerializer.java          | 2063 +++++---------------
 .../fury/serializer/collection/MapFlags.java       |   34 +-
 .../serializer/collection/MapSerializersTest.java  |   75 +-
 7 files changed, 716 insertions(+), 1596 deletions(-)

diff --git a/docs/specification/java_serialization_spec.md 
b/docs/specification/java_serialization_spec.md
index 2d482565..958baffa 100644
--- a/docs/specification/java_serialization_spec.md
+++ b/docs/specification/java_serialization_spec.md
@@ -433,7 +433,7 @@ Fury will serialize map chunk by chunk, every chunk has 127 
pairs at most.
 ```
 |    1 byte      |     1 byte     | variable bytes  |
 +----------------+----------------+-----------------+
-| chunk size: N  |    KV header   |   N*2 objects   |
+|    KV header   | chunk size: N  |   N*2 objects   |
 ```
 
 KV header:
@@ -441,13 +441,13 @@ KV header:
 - If track key ref, use the first bit `0b1` of the header to flag it.
 - If the key has null, use the second bit `0b10` of the header to flag it. If 
ref tracking is enabled for this
   key type, this flag is invalid.
-- If the key types of map are different, use the 3rd bit `0b100` of the header 
to flag it.
-- If the actual key type of map is not the declared key type, use the 4rd bit 
`0b1000` of the header to flag it.
-- If track value ref, use the 5th bit `0b10000` of the header to flag it.
-- If the value has null, use the 6th bit `0b100000` of the header to flag it. 
If ref tracking is enabled for this
+- If the actual key type of map is not the declared key type, use the 3rd bit 
`0b100` of the header to flag it.
+- If track value ref, use the 4th bit `0b1000` of the header to flag it.
+- If the value has null, use the 5th bit `0b10000` of the header to flag it. 
If ref tracking is enabled for this
   value type, this flag is invalid.
-- If the value types of map are different, use the 7rd bit `0b1000000` header 
to flag it.
-- If the value type of map is not the declared value type, use the 8rd bit 
`0b10000000` of the header to flag it.
+- If the value type of map is not the declared value type, use the 6rd bit 
`0b100000` of the header to flag it.
+- If key or value is null, that key and value will be written as a separate 
chunk, and chunk size writing will be
+  skipped too.
 
 If streaming write is enabled, which means Fury can't update written `chunk 
size`. In such cases, map key-value data
 format will be:
diff --git a/docs/specification/xlang_serialization_spec.md 
b/docs/specification/xlang_serialization_spec.md
index 7fd991ce..bf2c4fcc 100644
--- a/docs/specification/xlang_serialization_spec.md
+++ b/docs/specification/xlang_serialization_spec.md
@@ -567,7 +567,7 @@ Fury will serialize the map chunk by chunk, every chunk has 
255 pairs at most.
 ```
 |    1 byte      |     1 byte     | variable bytes  |
 +----------------+----------------+-----------------+
-| chunk size: N  |    KV header   |   N*2 objects   |
+|    KV header   | chunk size: N  |   N*2 objects   |
 ```
 
 KV header:
@@ -575,13 +575,13 @@ KV header:
 - If track key ref, use the first bit `0b1` of the header to flag it.
 - If the key has null, use the second bit `0b10` of the header to flag it. If 
ref tracking is enabled for this
   key type, this flag is invalid.
-- If the key types of map are different, use the 3rd bit `0b100` of the header 
to flag it.
-- If the actual key type of the map is not the declared key type, use the 4rd 
bit `0b1000` of the header to flag it.
-- If track value ref, use the 5th bit `0b10000` of the header to flag it.
-- If the value has null, use the 6th bit `0b100000` of the header to flag it. 
If ref tracking is enabled for this
+- If the actual key type of map is not the declared key type, use the 3rd bit 
`0b100` of the header to flag it.
+- If track value ref, use the 4th bit `0b1000` of the header to flag it.
+- If the value has null, use the 5th bit `0b10000` of the header to flag it. 
If ref tracking is enabled for this
   value type, this flag is invalid.
-- If the value types of the map are different, use the 7rd bit `0b1000000` 
header to flag it.
-- If the value type of map is not the declared value type, use the 8rd bit 
`0b10000000` of the header to flag it.
+- If the value type of map is not the declared value type, use the 6rd bit 
`0b100000` of the header to flag it.
+- If key or value is null, that key and value will be written as a separate 
chunk, and chunk size writing will be
+  skipped too.
 
 If streaming write is enabled, which means Fury can't update written `chunk 
size`. In such cases, map key-value data
 format will be:
diff --git 
a/java/benchmark/src/main/java/org/apache/fury/benchmark/MapSerializationSuite.java
 
b/java/benchmark/src/main/java/org/apache/fury/benchmark/MapSerializationSuite.java
new file mode 100644
index 00000000..fb949022
--- /dev/null
+++ 
b/java/benchmark/src/main/java/org/apache/fury/benchmark/MapSerializationSuite.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.benchmark;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.fury.Fury;
+import org.apache.fury.serializer.Serializer;
+import org.apache.fury.serializer.collection.AbstractMapSerializer;
+import org.openjdk.jmh.Main;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.CompilerControl;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+@BenchmarkMode(Mode.Throughput)
+@CompilerControl(value = CompilerControl.Mode.INLINE)
+public class MapSerializationSuite {
+  public static void main(String[] args) throws IOException {
+    if (args.length == 0) {
+      String commandLine =
+          "org.apache.fury.*MapSerializationSuite.* -f 1 -wi 3 -i 3 -t 1 -w 2s 
-r 2s -rf csv";
+      System.out.println(commandLine);
+      args = commandLine.split(" ");
+    }
+    Main.main(args);
+  }
+
+  @State(Scope.Thread)
+  public static class MapState {
+    @Param({"5", "20", "50", "100", "200"})
+    public int mapSize;
+
+    @Param({"false", "true"})
+    public boolean enableChunkEncoding;
+
+    private Map<String, String> stringMap;
+    private Map<Integer, Integer> integerMap;
+    private byte[] stringMapBytes;
+    private byte[] integerMapBytes;
+    private Fury fury;
+
+    @Setup(Level.Trial)
+    public void setup() {
+      fury = Fury.builder().build();
+      Serializer<HashMap> serializer = fury.getSerializer(HashMap.class);
+      ((AbstractMapSerializer) 
serializer).setUseChunkSerialize(enableChunkEncoding);
+      stringMap = new HashMap<>(mapSize);
+      integerMap = new HashMap<>(mapSize);
+      for (int i = 0; i < mapSize; i++) {
+        stringMap.put("k" + i, "v" + i);
+        integerMap.put(i, i * 2);
+      }
+      stringMapBytes = fury.serialize(stringMap);
+      integerMapBytes = fury.serialize(integerMap);
+    }
+  }
+
+  @Benchmark
+  public Object serializeStringMap(MapState state) {
+    return state.fury.serialize(state.stringMap);
+  }
+
+  @Benchmark
+  public Object serializeIntMap(MapState state) {
+    return state.fury.serialize(state.integerMap);
+  }
+
+  @Benchmark
+  public Object deserializeStringMap(MapState state) {
+    return state.fury.deserialize(state.stringMapBytes);
+  }
+
+  @Benchmark
+  public Object deserializeIntMap(MapState state) {
+    return state.fury.deserialize(state.integerMapBytes);
+  }
+}
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java 
b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java
index 9bfa7efe..8e8b41ee 100644
--- a/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java
+++ b/java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java
@@ -1318,6 +1318,17 @@ public final class MemoryBuffer {
     return UNSAFE.getByte(heapMemory, address + readerIdx) != 0;
   }
 
+  public int readUnsignedByte() {
+    int readerIdx = readerIndex;
+    if (readerIdx > size - 1) {
+      streamReader.fillBuffer(1);
+    }
+    readerIndex = readerIdx + 1;
+    int v = UNSAFE.getByte(heapMemory, address + readerIdx);
+    v &= 0b11111111;
+    return v;
+  }
+
   public byte readByte() {
     int readerIdx = readerIndex;
     if (readerIdx > size - 1) {
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
index 51489e19..ebad0f9f 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java
@@ -19,10 +19,22 @@
 
 package org.apache.fury.serializer.collection;
 
+import static org.apache.fury.serializer.collection.MapFlags.KEY_DECL_TYPE;
+import static org.apache.fury.serializer.collection.MapFlags.KEY_HAS_NULL;
+import static org.apache.fury.serializer.collection.MapFlags.KV_NULL;
+import static 
org.apache.fury.serializer.collection.MapFlags.NULL_KEY_VALUE_DECL_TYPE;
+import static 
org.apache.fury.serializer.collection.MapFlags.NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF;
+import static 
org.apache.fury.serializer.collection.MapFlags.NULL_VALUE_KEY_DECL_TYPE;
+import static 
org.apache.fury.serializer.collection.MapFlags.NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF;
+import static org.apache.fury.serializer.collection.MapFlags.TRACKING_KEY_REF;
+import static 
org.apache.fury.serializer.collection.MapFlags.TRACKING_VALUE_REF;
+import static org.apache.fury.serializer.collection.MapFlags.VALUE_DECL_TYPE;
+import static org.apache.fury.serializer.collection.MapFlags.VALUE_HAS_NULL;
 import static org.apache.fury.type.TypeUtils.MAP_TYPE;
 
 import com.google.common.collect.ImmutableMap.Builder;
 import java.lang.invoke.MethodHandle;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.fury.Fury;
@@ -39,13 +51,12 @@ import org.apache.fury.serializer.Serializer;
 import org.apache.fury.type.GenericType;
 import org.apache.fury.type.Generics;
 import org.apache.fury.type.TypeUtils;
-import org.apache.fury.util.Preconditions;
 
 /** Serializer for all map-like objects. */
 @SuppressWarnings({"unchecked", "rawtypes"})
 public abstract class AbstractMapSerializer<T> extends Serializer<T> {
-  private static final int MAX_CHUNK_SIZE = 127;
-  private static final byte MARK_HAS_WRITE_CLASS_INFO = -1;
+  private static final int MAX_CHUNK_SIZE = 255;
+
   protected MethodHandle constructor;
   protected final boolean supportCodegenHook;
   protected boolean useChunkSerialize;
@@ -58,7 +69,7 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
   // support map subclass whose key or value generics only are available,
   // or one of types is already instantiated in subclass, ex: `Subclass<T> 
implements Map<String,
   // T>`
-  private final IdentityMap<GenericType, Tuple2<GenericType, GenericType>> 
partialGenericKVTypeMap;
+  private final IdentityMap<GenericType, GenericType> partialGenericKVTypeMap;
   private final GenericType objType = 
fury.getClassResolver().buildGenericType(Object.class);
   // For subclass whose kv type are instantiated already, such as
   // `Subclass implements Map<String, Long>`. If declared `Map` doesn't specify
@@ -69,6 +80,7 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
   // interpreter and jit mode although it seems unnecessary.
   // With kv header in future, we can write this kv classes only once, the 
cost won't be too much.
   private int numElements;
+  private final ClassResolver classResolver;
 
   public AbstractMapSerializer(Fury fury, Class<T> cls) {
     this(fury, cls, !ReflectionUtils.isDynamicGeneratedCLass(cls));
@@ -76,6 +88,7 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
 
   public AbstractMapSerializer(Fury fury, Class<T> cls, boolean 
supportCodegenHook) {
     super(fury, cls);
+    this.classResolver = fury.getClassResolver();
     this.supportCodegenHook = supportCodegenHook;
     keyClassInfoWriteCache = fury.getClassResolver().nilClassInfoHolder();
     keyClassInfoReadCache = fury.getClassResolver().nilClassInfoHolder();
@@ -87,6 +100,7 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
   public AbstractMapSerializer(
       Fury fury, Class<T> cls, boolean supportCodegenHook, boolean immutable) {
     super(fury, cls, immutable);
+    this.classResolver = fury.getClassResolver();
     this.supportCodegenHook = supportCodegenHook;
     keyClassInfoWriteCache = fury.getClassResolver().nilClassInfoHolder();
     keyClassInfoReadCache = fury.getClassResolver().nilClassInfoHolder();
@@ -162,7 +176,7 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, Map 
map) {
+  protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, 
Map<Object, Object> map) {
     Serializer keySerializer = this.keySerializer;
     Serializer valueSerializer = this.valueSerializer;
     // clear the elemSerializer to avoid conflict if the nested
@@ -170,365 +184,282 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     // TODO use generics for compatible serializer.
     this.keySerializer = null;
     this.valueSerializer = null;
-    if (keySerializer != null && valueSerializer != null) {
-      javaChunkWriteWithKVSerializers(buffer, map, keySerializer, 
valueSerializer);
-    } else if (keySerializer != null) {
-      javaChunkWriteWithKeySerializers(map, buffer, keySerializer);
-    } else if (valueSerializer != null) {
-      javaChunkWriteWithValueSerializers(map, buffer, valueSerializer);
-    } else {
-      genericJavaChunkWrite(fury, buffer, map);
+    if (map.isEmpty()) {
+      return;
     }
-  }
-
-  private void javaChunkWriteWithKeySerializers(
-      Map map, MemoryBuffer buffer, Serializer keySerializer) {
-    boolean prevKeyIsNull = false;
-    int header = 0;
-    int chunkSize = 0;
-    int startOffset = -1;
-    boolean valueIsDifferentType = false;
-    Class valueClass = null;
-    boolean reset = false;
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      final Object value = entry.getValue();
-      if (key == null) {
-        prevKeyIsNull = true;
-      }
-      if (!valueIsDifferentType) {
-        if (value != null) {
-          if (valueClass == null) {
-            valueClass = value.getClass();
-          }
-          valueIsDifferentType = valueClass != value.getClass();
-          if (valueIsDifferentType) {
-            reset = true;
+    ClassResolver classResolver = fury.getClassResolver();
+    Iterator<Entry<Object, Object>> iterator = map.entrySet().iterator();
+    Entry<Object, Object> entry = iterator.next();
+    while (entry != null) {
+      entry = writeJavaNullChunk(buffer, entry, iterator, keySerializer, 
valueSerializer);
+      if (entry != null) {
+        if (keySerializer != null || valueSerializer != null) {
+          entry =
+              writeJavaChunk(
+                  classResolver, buffer, entry, iterator, keySerializer, 
valueSerializer);
+        } else {
+          Generics generics = fury.getGenerics();
+          GenericType genericType = generics.nextGenericType();
+          if (genericType == null) {
+            entry = writeJavaChunk(classResolver, buffer, entry, iterator, 
null, null);
+          } else {
+            entry =
+                writeJavaChunkGeneric(
+                    classResolver, generics, genericType, buffer, entry, 
iterator);
           }
         }
       }
-      if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) {
-        writeHeader(buffer, chunkSize, header, startOffset);
-        prevKeyIsNull = false;
-        header = 0;
-        chunkSize = 0;
-        startOffset = -1;
-        valueClass = value == null ? null : value.getClass();
-        reset = false;
-      }
-      startOffset = preserveByte(buffer, startOffset);
-      boolean trackingKeyRef = keySerializer.needToWriteRef();
-      boolean trackingValueRef = fury.trackingRef();
-      header =
-          updateKVHeader(
-              key, trackingKeyRef, value, trackingValueRef, header, false, 
valueIsDifferentType);
-      writeFinalKey(key, buffer, keySerializer, trackingKeyRef);
-      writeCommonValue(
-          header,
-          trackingValueRef,
-          valueIsDifferentType,
-          startOffset,
-          value,
-          buffer,
-          fury.getClassResolver(),
-          fury.getRefResolver());
-      chunkSize++;
     }
-    writeHeader(buffer, chunkSize, header, startOffset);
-  }
-
-  /**
-   * user preserve 2 bytes to mark whether class info have been written avoid 
to use a variable to
-   * mark these 2 bytes will be overwritten when we finish the chunk.
-   *
-   * @param buffer buffer to write.
-   * @param offset offset to mark.
-   */
-  private void markHasWriteClassInfo(MemoryBuffer buffer, int offset) {
-    int writeIndex = buffer.writerIndex();
-    buffer.writerIndex(offset);
-    buffer.writeByte(MARK_HAS_WRITE_CLASS_INFO);
-    buffer.writerIndex(writeIndex);
   }
 
-  private void writeCommonKey(
-      boolean trackingKeyRef,
-      boolean keyIsDifferentType,
-      int startOffset,
-      Object key,
+  public Entry writeJavaNullChunk(
       MemoryBuffer buffer,
-      ClassResolver classResolver,
-      RefResolver refResolver) {
-    if (!trackingKeyRef) {
-      if (key == null) {
-        buffer.writeByte(Fury.NULL_FLAG);
-      } else {
-        if (!keyIsDifferentType) {
-          Serializer keyWriteSerializer =
-              getKeyWriteSerializer(startOffset, key, buffer, classResolver);
-          keyWriteSerializer.write(buffer, key);
-        } else {
-          fury.writeNonRef(
-              buffer, key, classResolver.getClassInfo(key.getClass(), 
keyClassInfoWriteCache));
+      Entry entry,
+      Iterator<Entry<Object, Object>> iterator,
+      Serializer keySerializer,
+      Serializer valueSerializer) {
+    while (true) {
+      Object key = entry.getKey();
+      Object value = entry.getValue();
+      if (key != null) {
+        if (value != null) {
+          return entry;
         }
+        writeNullValueChunk(buffer, keySerializer, key);
+      } else {
+        writeNullKeyChunk(buffer, valueSerializer, value);
       }
-    } else {
-      if (key == null) {
-        buffer.writeByte(Fury.NULL_FLAG);
+      if (iterator.hasNext()) {
+        entry = iterator.next();
       } else {
-        if (!keyIsDifferentType) {
-          Serializer keyWriteSerializer =
-              getKeyWriteSerializer(startOffset, key, buffer, classResolver);
-          writeNoNullRef(keyWriteSerializer, key, buffer, refResolver);
-        } else {
-          if (!refResolver.writeNullFlag(buffer, key)) {
-            fury.writeRef(
-                buffer, key, classResolver.getClassInfo(key.getClass(), 
keyClassInfoWriteCache));
-          }
-        }
+        return null;
       }
     }
   }
 
-  private Serializer getKeyWriteSerializer(
-      int startOffset, Object key, MemoryBuffer buffer, ClassResolver 
classResolver) {
-    ClassInfo classInfo = classResolver.getClassInfo(key.getClass(), 
keyClassInfoWriteCache);
-    if (buffer.getByte(startOffset) != MARK_HAS_WRITE_CLASS_INFO) {
-      classResolver.writeClass(buffer, classInfo);
-      markHasWriteClassInfo(buffer, startOffset);
+  private void writeNullValueChunk(MemoryBuffer buffer, Serializer 
keySerializer, Object key) {
+    // noinspection Duplicates
+    if (keySerializer != null) {
+      if (keySerializer.needToWriteRef()) {
+        buffer.writeByte(NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF);
+        fury.writeRef(buffer, key, keySerializer);
+      } else {
+        buffer.writeByte(NULL_VALUE_KEY_DECL_TYPE);
+        keySerializer.write(buffer, key);
+      }
+    } else {
+      buffer.writeByte(VALUE_HAS_NULL | TRACKING_KEY_REF);
+      fury.writeRef(buffer, key, keyClassInfoWriteCache);
     }
-    return classInfo.getSerializer();
   }
 
-  private void writeCommonValue(
-      int header,
-      boolean trackingValueRef,
-      boolean valueIsDifferentType,
-      int startOffset,
-      Object value,
-      MemoryBuffer buffer,
-      ClassResolver classResolver,
-      RefResolver refResolver) {
-    if (!trackingValueRef) {
-      if (value == null) {
-        buffer.writeByte(Fury.NULL_FLAG);
-      } else {
-        if (!valueIsDifferentType) {
-          if (valueHasNull(header)) {
-            buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG);
-          }
-          Serializer valueWriteSerializer =
-              getValueWriteSerializer(startOffset, value, buffer, 
classResolver);
-          valueWriteSerializer.write(buffer, value);
+  /**
+   * Write chunk of size 1, the key is null. Since we can have at most one key 
whose value is null,
+   * this method is not in critical path, make it as a separate method to let 
caller eligible for
+   * jit inline.
+   */
+  private void writeNullKeyChunk(MemoryBuffer buffer, Serializer 
valueSerializer, Object value) {
+    if (value != null) {
+      // noinspection Duplicates
+      if (valueSerializer != null) {
+        if (valueSerializer.needToWriteRef()) {
+          buffer.writeByte(NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF);
+          fury.writeRef(buffer, value, valueSerializer);
         } else {
-          fury.writeNullable(
-              buffer,
-              value,
-              classResolver.getClassInfo(value.getClass(), 
valueClassInfoWriteCache));
+          buffer.writeByte(NULL_KEY_VALUE_DECL_TYPE);
+          valueSerializer.write(buffer, value);
         }
-      }
-    } else {
-      if (value == null) {
-        buffer.writeByte(Fury.NULL_FLAG);
       } else {
-        if (!valueIsDifferentType) {
-          Serializer valueWriteSerializer =
-              getValueWriteSerializer(startOffset, value, buffer, 
classResolver);
-          if (!valueHasNull(header)) {
-            writeNoNullRef(valueWriteSerializer, value, buffer, refResolver);
-          } else {
-            fury.writeRef(buffer, value, valueWriteSerializer);
-          }
-        } else {
-          if (!refResolver.writeNullFlag(buffer, value)) {
-            fury.writeRef(
-                buffer,
-                value,
-                classResolver.getClassInfo(value.getClass(), 
valueClassInfoWriteCache));
-          }
-        }
+        buffer.writeByte(KEY_HAS_NULL | TRACKING_VALUE_REF);
+        fury.writeRef(buffer, value, valueClassInfoWriteCache);
       }
+    } else {
+      buffer.writeByte(KV_NULL);
     }
   }
 
-  private Serializer getValueWriteSerializer(
-      int startOffset, Object value, MemoryBuffer buffer, ClassResolver 
classResolver) {
-    ClassInfo classInfo = classResolver.getClassInfo(value.getClass(), 
valueClassInfoWriteCache);
-    if (buffer.getByte(startOffset + 1) != MARK_HAS_WRITE_CLASS_INFO) {
-      classResolver.writeClass(buffer, classInfo);
-      markHasWriteClassInfo(buffer, startOffset + 1);
+  // Make byte code of this method smaller than 325 for better jit inline
+  private Entry writeJavaChunk(
+      ClassResolver classResolver,
+      MemoryBuffer buffer,
+      Entry<Object, Object> entry,
+      Iterator<Entry<Object, Object>> iterator,
+      Serializer keySerializer,
+      Serializer valueSerializer) {
+    Object key = entry.getKey();
+    Object value = entry.getValue();
+    Class keyType = key.getClass();
+    Class valueType = value.getClass();
+    // place holder for chunk header and size.
+    buffer.writeInt16((short) -1);
+    int chunkSizeOffset = buffer.writerIndex() - 1;
+    int chunkHeader = 0;
+    if (keySerializer != null) {
+      chunkHeader |= VALUE_DECL_TYPE;
+    } else {
+      keySerializer = writeKeyClassInfo(classResolver, keyType, buffer);
     }
-    return classInfo.getSerializer();
-  }
-
-  private void javaChunkWriteWithValueSerializers(
-      Map map, MemoryBuffer buffer, Serializer valueSerializer) {
-    boolean prevKeyIsNull = false;
-    int header = 0;
+    if (valueSerializer != null) {
+      chunkHeader |= VALUE_DECL_TYPE;
+    } else {
+      valueSerializer = writeValueClassInfo(classResolver, valueType, buffer);
+    }
+    // noinspection Duplicates
+    boolean keyWriteRef = keySerializer.needToWriteRef();
+    boolean valueWriteRef = valueSerializer.needToWriteRef();
+    if (keyWriteRef) {
+      chunkHeader |= TRACKING_KEY_REF;
+    }
+    if (valueWriteRef) {
+      chunkHeader |= TRACKING_VALUE_REF;
+    }
+    buffer.putByte(chunkSizeOffset - 1, (byte) chunkHeader);
+    RefResolver refResolver = fury.getRefResolver();
+    // Use int to make chunk size representable for 0~255 instead of 0~127.
     int chunkSize = 0;
-    int startOffset = -1;
-    boolean keyIsDifferentType = false;
-    Class keyClass = null;
-    boolean reset = false;
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      final Object value = entry.getValue();
-      if (key == null) {
-        prevKeyIsNull = true;
+    while (true) {
+      if (key == null
+          || value == null
+          || (key.getClass() != keyType)
+          || (value.getClass() != valueType)) {
+        break;
+      }
+      if (!keyWriteRef || !refResolver.writeRefOrNull(buffer, key)) {
+        keySerializer.write(buffer, key);
       }
-      if (!keyIsDifferentType) {
-        if (key != null) {
-          if (keyClass == null) {
-            keyClass = key.getClass();
-          }
-          keyIsDifferentType = keyClass != key.getClass();
-          if (keyIsDifferentType) {
-            reset = true;
-          }
-        }
+      if (!valueWriteRef || !refResolver.writeRefOrNull(buffer, value)) {
+        valueSerializer.write(buffer, value);
       }
-      if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) {
-        writeHeader(buffer, chunkSize, header, startOffset);
-        prevKeyIsNull = false;
-        header = 0;
-        chunkSize = 0;
-        startOffset = -1;
-        keyClass = key == null ? null : key.getClass();
+      // noinspection Duplicates
+      if (++chunkSize == MAX_CHUNK_SIZE) {
+        break;
+      }
+      if (iterator.hasNext()) {
+        entry = iterator.next();
+        key = entry.getKey();
+        value = entry.getValue();
+      } else {
+        entry = null;
+        break;
       }
-      startOffset = preserveByte(buffer, startOffset);
-      boolean trackingKeyRef = fury.trackingRef();
-      boolean trackingValueRef = valueSerializer.needToWriteRef();
-      header =
-          updateKVHeader(
-              key, trackingKeyRef, value, trackingValueRef, header, 
keyIsDifferentType, false);
-      writeCommonKey(
-          trackingKeyRef,
-          keyIsDifferentType,
-          startOffset,
-          key,
-          buffer,
-          fury.getClassResolver(),
-          fury.getRefResolver());
-      writeFinalValue(value, buffer, valueSerializer, trackingValueRef, 
header);
-      chunkSize++;
-    }
-    writeHeader(buffer, chunkSize, header, startOffset);
-  }
-
-  private int preserveByte(MemoryBuffer buffer, int startOffset) {
-    if (startOffset == -1) {
-      int writerIndex = buffer.writerIndex();
-      // preserve two byte for header and chunk size
-      buffer.writerIndex(writerIndex + 2);
-      return writerIndex;
     }
-    return startOffset;
+    buffer.putByte(chunkSizeOffset, (byte) chunkSize);
+    return entry;
   }
 
-  private void javaChunkWriteWithKVSerializers(
-      MemoryBuffer buffer, Map map, Serializer keySerializer, Serializer 
valueSerializer) {
-    boolean prevKeyIsNull = false;
-    int header = 0;
-    int chunkSize = 0;
-    int startOffset = -1;
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      Object value = entry.getValue();
-      if (key == null) {
-        prevKeyIsNull = true;
-      }
-      if (needReset(key, chunkSize, prevKeyIsNull, value, header, false)) {
-        // update header at the beginning of the chunk when we reset chunk
-        writeHeader(buffer, chunkSize, header, startOffset);
-        header = 0;
-        chunkSize = 0;
-        startOffset = -1;
-        prevKeyIsNull = false;
-      }
-      startOffset = preserveByte(buffer, startOffset);
-      boolean trackingKeyRef = keySerializer.needToWriteRef();
-      boolean trackingValueRef = valueSerializer.needToWriteRef();
-      header = updateKVHeader(key, trackingKeyRef, value, trackingValueRef, 
header, false, false);
-      writeFinalKey(key, buffer, keySerializer, trackingKeyRef);
-      writeFinalValue(value, buffer, valueSerializer, trackingValueRef, 
header);
-      chunkSize++;
-    }
-    // update header at the beginning of the chunk when we finish the iteration
-    writeHeader(buffer, chunkSize, header, startOffset);
+  private Serializer writeKeyClassInfo(
+      ClassResolver classResolver, Class keyType, MemoryBuffer buffer) {
+    ClassInfo classInfo = classResolver.getClassInfo(keyType, 
keyClassInfoWriteCache);
+    classResolver.writeClass(buffer, classInfo);
+    return classInfo.getSerializer();
   }
 
-  private void writeFinalKey(
-      Object key, MemoryBuffer buffer, Serializer keySerializer, boolean 
trackingKeyRef) {
-    if (!trackingKeyRef) {
-      // map key has one null at most, use one chunk to write
-      if (key == null) {
-        buffer.writeByte(Fury.NULL_FLAG);
-      } else {
-        keySerializer.write(buffer, key);
-      }
-    } else {
-      RefResolver refResolver = fury.getRefResolver();
-      if (!refResolver.writeRefOrNull(buffer, key)) {
-        keySerializer.write(buffer, key);
-      }
-    }
+  private Serializer writeValueClassInfo(
+      ClassResolver classResolver, Class valueType, MemoryBuffer buffer) {
+    ClassInfo classInfo = classResolver.getClassInfo(valueType, 
valueClassInfoWriteCache);
+    classResolver.writeClass(buffer, classInfo);
+    return classInfo.getSerializer();
   }
 
-  private void writeFinalValue(
-      Object value,
+  private Entry writeJavaChunkGeneric(
+      ClassResolver classResolver,
+      Generics generics,
+      GenericType genericType,
       MemoryBuffer buffer,
-      Serializer valueSerializer,
-      boolean trackingValueRef,
-      int header) {
-    if (!trackingValueRef) {
-      if (value == null) {
-        buffer.writeByte(Fury.NULL_FLAG);
-      } else {
-        if (valueHasNull(header)) {
-          buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG);
-          valueSerializer.write(buffer, value);
-        } else {
-          valueSerializer.write(buffer, value);
-        }
-      }
+      Entry<Object, Object> entry,
+      Iterator<Entry<Object, Object>> iterator) {
+    // type parameters count for `Map field` will be 0;
+    // type parameters count for `SubMap<V> field` which SubMap is
+    // `SubMap<V> implements Map<String, V>` will be 1;
+    if (genericType.getTypeParametersCount() < 2) {
+      genericType = getKVGenericType(genericType);
+    }
+    GenericType keyGenericType = genericType.getTypeParameter0();
+    GenericType valueGenericType = genericType.getTypeParameter1();
+    if (keyGenericType == objType && valueGenericType == objType) {
+      return writeJavaChunk(classResolver, buffer, entry, iterator, null, 
null);
+    }
+    // Can't avoid push generics repeatedly in loop by stack depth, because 
push two
+    // generic type changed generics stack top, which is depth index, update 
stack top
+    // and depth will have some cost too.
+    // Stack depth to avoid push generics repeatedly in loop.
+    // Note push two generic type changed generics stack top, which is depth 
index,
+    // stack top should be updated when using for serialization k/v.
+    // int depth = fury.getDepth();
+    // // depth + 1 to leave a slot for value generics, otherwise value 
generics will
+    // // be overwritten by nested key generics.
+    // fury.setDepth(depth + 1);
+    // generics.pushGenericType(keyGenericType);
+    // fury.setDepth(depth);
+    // generics.pushGenericType(valueGenericType);
+    boolean keyGenericTypeFinal = keyGenericType.isMonomorphic();
+    boolean valueGenericTypeFinal = valueGenericType.isMonomorphic();
+    Object key = entry.getKey();
+    Object value = entry.getValue();
+    Class keyType = key.getClass();
+    Class valueType = value.getClass();
+    Serializer keySerializer, valueSerializer;
+    // place holder for chunk header and size.
+    buffer.writeInt16((short) -1);
+    int chunkSizeOffset = buffer.writerIndex() - 1;
+    int chunkHeader = 0;
+    // noinspection Duplicates
+    if (keyGenericTypeFinal) {
+      chunkHeader |= KEY_DECL_TYPE;
+      keySerializer = keyGenericType.getSerializer(classResolver);
     } else {
-      RefResolver refResolver = fury.getRefResolver();
-      if (!refResolver.writeRefOrNull(buffer, value)) {
-        valueSerializer.write(buffer, value);
-      }
-    }
-  }
-
-  private int updateKVHeader(
-      Object key,
-      boolean trackingKeyRef,
-      Object value,
-      boolean trackingValueRef,
-      int header,
-      boolean keyIsDifferentType,
-      boolean valueIsDifferentType) {
-    if (trackingKeyRef) {
-      header |= MapFlags.TRACKING_KEY_REF;
-    }
-    if (key == null) {
-      header |= MapFlags.KEY_HAS_NULL;
+      keySerializer = writeKeyClassInfo(classResolver, keyType, buffer);
     }
-    if (trackingValueRef) {
-      header |= MapFlags.TRACKING_VALUE_REF;
+    if (valueGenericTypeFinal) {
+      chunkHeader |= VALUE_DECL_TYPE;
+      valueSerializer = valueGenericType.getSerializer(classResolver);
+    } else {
+      valueSerializer = writeValueClassInfo(classResolver, valueType, buffer);
     }
-    if (value == null) {
-      header |= MapFlags.VALUE_HAS_NULL;
+    boolean keyWriteRef = keySerializer.needToWriteRef();
+    if (keyWriteRef) {
+      chunkHeader |= TRACKING_KEY_REF;
     }
-    if (keyIsDifferentType) {
-      header |= MapFlags.KEY_NOT_SAME_TYPE;
+    boolean valueWriteRef = valueSerializer.needToWriteRef();
+    if (valueWriteRef) {
+      chunkHeader |= TRACKING_VALUE_REF;
     }
-    if (valueIsDifferentType) {
-      header |= MapFlags.VALUE_NOT_SAME_TYPE;
+    buffer.putByte(chunkSizeOffset - 1, (byte) chunkHeader);
+    RefResolver refResolver = fury.getRefResolver();
+    // Use int to make chunk size representable for 0~255 instead of 0~127.
+    int chunkSize = 0;
+    while (true) {
+      if (key == null
+          || value == null
+          || (key.getClass() != keyType)
+          || (value.getClass() != valueType)) {
+        break;
+      }
+      generics.pushGenericType(keyGenericType);
+      if (!keyWriteRef || !refResolver.writeRefOrNull(buffer, key)) {
+        keySerializer.write(buffer, key);
+      }
+      generics.popGenericType();
+      generics.pushGenericType(valueGenericType);
+      if (!valueWriteRef || !refResolver.writeRefOrNull(buffer, value)) {
+        valueSerializer.write(buffer, value);
+      }
+      generics.popGenericType();
+      // noinspection Duplicates
+      if (++chunkSize == MAX_CHUNK_SIZE) {
+        break;
+      }
+      if (iterator.hasNext()) {
+        entry = iterator.next();
+        key = entry.getKey();
+        value = entry.getValue();
+      } else {
+        entry = null;
+        break;
+      }
     }
-    return header;
+    buffer.putByte(chunkSizeOffset, (byte) chunkSize);
+    return entry;
   }
 
   private void javaWriteWithKVSerializers(
@@ -552,19 +483,18 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     if (genericType == null) {
       generalJavaWrite(fury, buffer, map);
     } else {
-      GenericType keyGenericType = genericType.getTypeParameter0();
-      GenericType valueGenericType = genericType.getTypeParameter1();
+
       // type parameters count for `Map field` will be 0;
       // type parameters count for `SubMap<V> field` which SubMap is
       // `SubMap<V> implements Map<String, V>` will be 1;
       if (genericType.getTypeParametersCount() < 2) {
-        Tuple2<GenericType, GenericType> kvGenericType = 
getKVGenericType(genericType);
-        if (keyGenericType == objType && valueGenericType == objType) {
-          generalJavaWrite(fury, buffer, map);
-          return;
-        }
-        keyGenericType = kvGenericType.f0;
-        valueGenericType = kvGenericType.f1;
+        genericType = getKVGenericType(genericType);
+      }
+      GenericType keyGenericType = genericType.getTypeParameter0();
+      GenericType valueGenericType = genericType.getTypeParameter1();
+      if (keyGenericType == objType && valueGenericType == objType) {
+        generalJavaWrite(fury, buffer, map);
+        return;
       }
       // Can't avoid push generics repeatedly in loop by stack depth, because 
push two
       // generic type changed generics stack top, which is depth index, update 
stack top
@@ -593,54 +523,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void genericJavaChunkWrite(Fury fury, MemoryBuffer buffer, Map map) {
-    Generics generics = fury.getGenerics();
-    GenericType genericType = generics.nextGenericType();
-    if (genericType == null) {
-      generalJavaChunkWrite(fury, buffer, map);
-    } else {
-      GenericType keyGenericType = genericType.getTypeParameter0();
-      GenericType valueGenericType = genericType.getTypeParameter1();
-      // type parameters count for `Map field` will be 0;
-      // type parameters count for `SubMap<V> field` which SubMap is
-      // `SubMap<V> implements Map<String, V>` will be 1;
-      if (genericType.getTypeParametersCount() < 2) {
-        Tuple2<GenericType, GenericType> kvGenericType = 
getKVGenericType(genericType);
-        if (keyGenericType == objType && valueGenericType == objType) {
-          generalJavaChunkWrite(fury, buffer, map);
-          return;
-        }
-        keyGenericType = kvGenericType.f0;
-        valueGenericType = kvGenericType.f1;
-      }
-      // Can't avoid push generics repeatedly in loop by stack depth, because 
push two
-      // generic type changed generics stack top, which is depth index, update 
stack top
-      // and depth will have some cost too.
-      // Stack depth to avoid push generics repeatedly in loop.
-      // Note push two generic type changed generics stack top, which is depth 
index,
-      // stack top should be updated when using for serialization k/v.
-      // int depth = fury.getDepth();
-      // // depth + 1 to leave a slot for value generics, otherwise value 
generics will
-      // // be overwritten by nested key generics.
-      // fury.setDepth(depth + 1);
-      // generics.pushGenericType(keyGenericType);
-      // fury.setDepth(depth);
-      // generics.pushGenericType(valueGenericType);
-      boolean keyGenericTypeFinal = keyGenericType.isMonomorphic();
-      boolean valueGenericTypeFinal = valueGenericType.isMonomorphic();
-      if (keyGenericTypeFinal && valueGenericTypeFinal) {
-        javaKVTypesFinalChunkWrite(fury, buffer, map, keyGenericType, 
valueGenericType, generics);
-      } else if (keyGenericTypeFinal) {
-        javaKeyTypeFinalChunkWrite(fury, buffer, map, keyGenericType, 
valueGenericType, generics);
-      } else if (valueGenericTypeFinal) {
-        javaValueTypeFinalChunkWrite(fury, buffer, map, keyGenericType, 
valueGenericType, generics);
-      } else {
-        javaKVTypesNonFinalChunkWrite(
-            fury, buffer, map, keyGenericType, valueGenericType, generics);
-      }
-    }
-  }
-
   private void javaKVTypesFinalWrite(
       Fury fury,
       MemoryBuffer buffer,
@@ -661,51 +543,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  /**
-   * kv final write do not need to predict , since key and value is almost 
same type unless null.
-   */
-  private void javaKVTypesFinalChunkWrite(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics) {
-    boolean prevKeyIsNull = false;
-    int header = 0;
-    int chunkSize = 0;
-    int startOffset = -1;
-    Serializer keySerializer = 
keyGenericType.getSerializer(fury.getClassResolver());
-    Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      Object value = entry.getValue();
-      if (key == null) {
-        prevKeyIsNull = true;
-      }
-      if (needReset(key, chunkSize, prevKeyIsNull, value, header, false)) {
-        writeHeader(buffer, chunkSize, header, startOffset);
-        header = 0;
-        chunkSize = 0;
-        startOffset = -1;
-        prevKeyIsNull = false;
-      }
-      startOffset = preserveByte(buffer, startOffset);
-      boolean trackingKeyRef = keySerializer.needToWriteRef();
-      boolean trackingValueRef = valueSerializer.needToWriteRef();
-      header = updateKVHeader(key, trackingKeyRef, value, trackingValueRef, 
header, false, false);
-      generics.pushGenericType(keyGenericType);
-      writeFinalKey(key, buffer, keySerializer, trackingKeyRef);
-      generics.popGenericType();
-      generics.pushGenericType(valueGenericType);
-      writeFinalValue(value, buffer, valueSerializer, trackingValueRef, 
header);
-      generics.popGenericType();
-      chunkSize++;
-    }
-    writeHeader(buffer, chunkSize, header, startOffset);
-  }
-
   private void javaKeyTypeFinalWrite(
       Fury fury,
       MemoryBuffer buffer,
@@ -735,330 +572,57 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void javaKeyTypeFinalChunkWrite(
+  private void javaValueTypeFinalWrite(
       Fury fury,
       MemoryBuffer buffer,
       Map map,
       GenericType keyGenericType,
       GenericType valueGenericType,
       Generics generics) {
-    Serializer keySerializer = 
keyGenericType.getSerializer(fury.getClassResolver());
-    boolean trackingValueRef = 
fury.getClassResolver().needToWriteRef(valueGenericType.getCls());
-    boolean prevKeyIsNull = false;
-    int header = 0;
-    int chunkSize = 0;
-    int startOffset = -1;
-    boolean valueIsDifferentType = false;
-    Class valueClass = null;
-    boolean reset = false;
+    ClassResolver classResolver = fury.getClassResolver();
+    RefResolver refResolver = fury.getRefResolver();
+    boolean trackingKeyRef = 
fury.getClassResolver().needToWriteRef(keyGenericType.getCls());
+    Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
     for (Object object : map.entrySet()) {
       Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      Object value = entry.getValue();
-      if (key == null) {
-        prevKeyIsNull = true;
-      }
-      if (!valueIsDifferentType) {
-        if (value != null) {
-          if (valueClass == null) {
-            valueClass = value.getClass();
-          }
-          valueIsDifferentType = valueClass != value.getClass();
-        }
-        if (valueIsDifferentType) {
-          reset = true;
-        }
-      }
-      if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) {
-        writeHeader(buffer, chunkSize, header, startOffset);
-        prevKeyIsNull = false;
-        header = 0;
-        chunkSize = 0;
-        startOffset = -1;
-        valueClass = value == null ? null : value.getClass();
-        reset = false;
-      }
-      startOffset = preserveByte(buffer, startOffset);
       generics.pushGenericType(keyGenericType);
-      boolean trackingKeyRef = keySerializer.needToWriteRef();
-      header =
-          updateKVHeader(
-              key, trackingKeyRef, value, trackingValueRef, header, false, 
valueIsDifferentType);
-      writeFinalKey(key, buffer, keySerializer, trackingKeyRef);
+      writeJavaRefOptimized(
+          fury,
+          classResolver,
+          refResolver,
+          trackingKeyRef,
+          buffer,
+          entry.getKey(),
+          keyClassInfoWriteCache);
       generics.popGenericType();
       generics.pushGenericType(valueGenericType);
-      writeCommonValue(
-          header,
-          trackingValueRef,
-          valueIsDifferentType,
-          startOffset,
-          value,
-          buffer,
-          fury.getClassResolver(),
-          fury.getRefResolver());
+      fury.writeRef(buffer, entry.getValue(), valueSerializer);
       generics.popGenericType();
-      chunkSize++;
     }
-    writeHeader(buffer, chunkSize, header, startOffset);
   }
 
-  private void javaValueTypeFinalChunkWrite(
+  private void javaKVTypesNonFinalWrite(
       Fury fury,
       MemoryBuffer buffer,
       Map map,
       GenericType keyGenericType,
       GenericType valueGenericType,
       Generics generics) {
-    int header = 0;
-    int chunkSize = 0;
-    boolean prevKeyIsNull = false;
-    boolean keyIsDifferentType = false;
-    int startOffset = -1;
-    Class keyClass = null;
-    boolean reset = false;
-    Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
+    ClassResolver classResolver = fury.getClassResolver();
+    RefResolver refResolver = fury.getRefResolver();
     boolean trackingKeyRef = 
fury.getClassResolver().needToWriteRef(keyGenericType.getCls());
-    boolean trackingValueRef = valueSerializer.needToWriteRef();
+    boolean trackingValueRef = 
fury.getClassResolver().needToWriteRef(valueGenericType.getCls());
     for (Object object : map.entrySet()) {
       Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      Object value = entry.getValue();
-      if (key == null) {
-        prevKeyIsNull = true;
-      }
-      if (!keyIsDifferentType) {
-        if (key != null) {
-          if (keyClass == null) {
-            keyClass = key.getClass();
-          }
-          keyIsDifferentType = keyClass != key.getClass();
-          if (keyIsDifferentType) {
-            reset = true;
-          }
-        }
-      }
-      if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) {
-        writeHeader(buffer, chunkSize, header, startOffset);
-        header = 0;
-        chunkSize = 0;
-        prevKeyIsNull = false;
-        startOffset = -1;
-        keyClass = key == null ? null : key.getClass();
-        reset = false;
-      }
-      header =
-          updateKVHeader(
-              key, trackingKeyRef, value, trackingValueRef, header, false, 
keyIsDifferentType);
-      startOffset = preserveByte(buffer, startOffset);
       generics.pushGenericType(keyGenericType);
-      writeCommonKey(
+      writeJavaRefOptimized(
+          fury,
+          classResolver,
+          refResolver,
           trackingKeyRef,
-          keyIsDifferentType,
-          startOffset,
-          key,
           buffer,
-          fury.getClassResolver(),
-          fury.getRefResolver());
-      generics.popGenericType();
-      generics.pushGenericType(valueGenericType);
-      writeFinalValue(value, buffer, valueSerializer, trackingValueRef, 
header);
-      generics.popGenericType();
-      chunkSize++;
-    }
-    writeHeader(buffer, chunkSize, header, startOffset);
-  }
-
-  private void javaKVTypesNonFinalChunkWrite(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics) {
-    ClassResolver classResolver = fury.getClassResolver();
-    RefResolver refResolver = fury.getRefResolver();
-    int header = 0;
-    int startOffset = -1;
-    int chunkSize = 0;
-    Class<?> keyClass = null;
-    Class<?> valueClass = null;
-    boolean keyIsDifferentType = false;
-    boolean valueIsDifferentType = false;
-    boolean prevKeyIsNull = false;
-    boolean markChunkWriteFinish = false;
-    boolean reset = false;
-    boolean needMarkFinish = false;
-    boolean trackingKeyRef = 
fury.getClassResolver().needToWriteRef(keyGenericType.getCls());
-    boolean trackingValueRef = 
fury.getClassResolver().needToWriteRef(valueGenericType.getCls());
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      Object value = entry.getValue();
-      if (!markChunkWriteFinish) {
-        if (key == null) {
-          prevKeyIsNull = true;
-        }
-        if (!keyIsDifferentType) {
-          if (key != null) {
-            if (keyClass == null) {
-              keyClass = key.getClass();
-            }
-            keyIsDifferentType = keyClass != key.getClass();
-          }
-          if (keyIsDifferentType) {
-            reset = true;
-          }
-        }
-        if (!valueIsDifferentType) {
-          if (value != null) {
-            if (valueClass == null) {
-              valueClass = value.getClass();
-            }
-            valueIsDifferentType = valueClass != value.getClass();
-          }
-          if (valueIsDifferentType) {
-            reset = true;
-          }
-        }
-        if (keyIsDifferentType && valueIsDifferentType) {
-          needMarkFinish = true;
-        }
-        if (needMarkFinish) {
-          writeHeader(buffer, chunkSize, header, startOffset);
-          // set chunk size = 0
-          buffer.writeByte(0);
-          markChunkWriteFinish = true;
-        } else {
-          if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) {
-            writeHeader(buffer, chunkSize, header, startOffset);
-            header = 0;
-            chunkSize = 0;
-            prevKeyIsNull = false;
-            keyClass = key == null ? null : key.getClass();
-            valueClass = value == null ? null : value.getClass();
-            reset = false;
-            startOffset = -1;
-          }
-        }
-      }
-      if (markChunkWriteFinish) {
-        generics.pushGenericType(keyGenericType);
-        writeJavaRefOptimized(
-            fury, classResolver, refResolver, trackingKeyRef, buffer, key, 
keyClassInfoWriteCache);
-        generics.popGenericType();
-        generics.pushGenericType(valueGenericType);
-        writeJavaRefOptimized(
-            fury,
-            classResolver,
-            refResolver,
-            trackingValueRef,
-            buffer,
-            value,
-            keyClassInfoWriteCache);
-        generics.popGenericType();
-      } else {
-        startOffset = preserveByte(buffer, startOffset);
-        header =
-            updateKVHeader(
-                key,
-                trackingKeyRef,
-                value,
-                trackingValueRef,
-                header,
-                keyIsDifferentType,
-                valueIsDifferentType);
-        generics.pushGenericType(keyGenericType);
-        writeCommonKey(
-            trackingKeyRef,
-            keyIsDifferentType,
-            startOffset,
-            key,
-            buffer,
-            fury.getClassResolver(),
-            fury.getRefResolver());
-        generics.popGenericType();
-        generics.pushGenericType(valueGenericType);
-        writeCommonValue(
-            header,
-            trackingValueRef,
-            valueIsDifferentType,
-            startOffset,
-            value,
-            buffer,
-            fury.getClassResolver(),
-            fury.getRefResolver());
-        generics.popGenericType();
-        chunkSize++;
-      }
-    }
-    writeHeader(buffer, chunkSize, header, startOffset);
-  }
-
-  private boolean needReset(
-      Object key,
-      int chunkSize,
-      boolean prevKeyIsNull,
-      Object value,
-      int header,
-      boolean needReset) {
-    return (key == null && chunkSize > 0)
-        || (prevKeyIsNull && key != null)
-        || (value == null && chunkSize > 0 && !valueHasNull(header))
-        || (chunkSize >= MAX_CHUNK_SIZE)
-        || needReset;
-  }
-
-  private void javaValueTypeFinalWrite(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics) {
-    ClassResolver classResolver = fury.getClassResolver();
-    RefResolver refResolver = fury.getRefResolver();
-    boolean trackingKeyRef = 
fury.getClassResolver().needToWriteRef(keyGenericType.getCls());
-    Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      generics.pushGenericType(keyGenericType);
-      writeJavaRefOptimized(
-          fury,
-          classResolver,
-          refResolver,
-          trackingKeyRef,
-          buffer,
-          entry.getKey(),
-          keyClassInfoWriteCache);
-      generics.popGenericType();
-      generics.pushGenericType(valueGenericType);
-      fury.writeRef(buffer, entry.getValue(), valueSerializer);
-      generics.popGenericType();
-    }
-  }
-
-  private void javaKVTypesNonFinalWrite(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics) {
-    ClassResolver classResolver = fury.getClassResolver();
-    RefResolver refResolver = fury.getRefResolver();
-    boolean trackingKeyRef = 
fury.getClassResolver().needToWriteRef(keyGenericType.getCls());
-    boolean trackingValueRef = 
fury.getClassResolver().needToWriteRef(valueGenericType.getCls());
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      generics.pushGenericType(keyGenericType);
-      writeJavaRefOptimized(
-          fury,
-          classResolver,
-          refResolver,
-          trackingKeyRef,
-          buffer,
-          entry.getKey(),
-          keyClassInfoWriteCache);
+          entry.getKey(),
+          keyClassInfoWriteCache);
       generics.popGenericType();
       generics.pushGenericType(valueGenericType);
       writeJavaRefOptimized(
@@ -1085,130 +649,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  protected void generalJavaChunkWrite(Fury fury, MemoryBuffer buffer, Map 
map) {
-    int header = 0;
-    int startOffset = -1;
-    int chunkSize = 0;
-    Class<?> keyClass = null;
-    Class<?> valueClass = null;
-    boolean keyIsDifferentType = false;
-    boolean valueIsDifferentType = false;
-    boolean prevKeyIsNull = false;
-    boolean markChunkWriteFinish = false;
-    boolean reset = false;
-    boolean needMarkFinish = false;
-    ClassResolver classResolver = fury.getClassResolver();
-    RefResolver refResolver = fury.getRefResolver();
-    for (Object object : map.entrySet()) {
-      Map.Entry entry = (Map.Entry) object;
-      Object key = entry.getKey();
-      Object value = entry.getValue();
-      if (!markChunkWriteFinish) {
-        if (key == null) {
-          prevKeyIsNull = true;
-        }
-        if (!keyIsDifferentType) {
-          if (key != null) {
-            if (keyClass == null) {
-              keyClass = key.getClass();
-            }
-            keyIsDifferentType = keyClass != key.getClass();
-          }
-          if (keyIsDifferentType) {
-            reset = true;
-          }
-        }
-        if (!valueIsDifferentType) {
-          if (value != null) {
-            if (valueClass == null) {
-              valueClass = value.getClass();
-            }
-            valueIsDifferentType = valueClass != value.getClass();
-          }
-          if (valueIsDifferentType) {
-            reset = true;
-          }
-        }
-        if (valueIsDifferentType && keyIsDifferentType) {
-          needMarkFinish = true;
-        }
-        if (needMarkFinish) {
-          writeHeader(buffer, chunkSize, header, startOffset);
-          // set chunk size = 0
-          buffer.writeByte(0);
-          markChunkWriteFinish = true;
-        } else {
-          if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) {
-            writeHeader(buffer, chunkSize, header, startOffset);
-            header = 0;
-            chunkSize = 0;
-            startOffset = -1;
-            prevKeyIsNull = false;
-            keyClass = key == null ? null : key.getClass();
-            valueClass = value == null ? null : value.getClass();
-            reset = false;
-          }
-        }
-      }
-      if (!markChunkWriteFinish) {
-        startOffset = preserveByte(buffer, startOffset);
-        boolean trackingRef = fury.trackingRef();
-        header =
-            updateKVHeader(
-                key,
-                trackingRef,
-                value,
-                trackingRef,
-                header,
-                keyIsDifferentType,
-                valueIsDifferentType);
-        writeCommonKey(
-            trackingRef, keyIsDifferentType, startOffset, key, buffer, 
classResolver, refResolver);
-        writeCommonValue(
-            header,
-            trackingRef,
-            valueIsDifferentType,
-            startOffset,
-            value,
-            buffer,
-            classResolver,
-            refResolver);
-        chunkSize++;
-      } else {
-        writeJavaRefOptimized(
-            fury, classResolver, refResolver, buffer, entry.getKey(), 
keyClassInfoWriteCache);
-        writeJavaRefOptimized(
-            fury, classResolver, refResolver, buffer, entry.getValue(), 
valueClassInfoWriteCache);
-      }
-    }
-    writeHeader(buffer, chunkSize, header, startOffset);
-  }
-
-  private void writeNoNullRef(
-      Serializer serializer, Object o, MemoryBuffer buffer, RefResolver 
refResolver) {
-    if (serializer.needToWriteRef()) {
-      if (!refResolver.writeRefOrNull(buffer, o)) {
-        serializer.write(buffer, o);
-      }
-    } else {
-      serializer.write(buffer, o);
-    }
-  }
-
-  private boolean valueHasNull(int header) {
-    return (header & MapFlags.VALUE_HAS_NULL) == MapFlags.VALUE_HAS_NULL;
-  }
-
-  public void writeHeader(MemoryBuffer memoryBuffer, int chunkSize, int 
header, int startOffset) {
-    if (chunkSize > 0) {
-      int currentWriteIndex = memoryBuffer.writerIndex();
-      memoryBuffer.writerIndex(startOffset);
-      memoryBuffer.writeByte(chunkSize);
-      memoryBuffer.writeByte(header);
-      memoryBuffer.writerIndex(currentWriteIndex);
-    }
-  }
-
   public static void xwriteElements(Fury fury, MemoryBuffer buffer, Map value) 
{
     Generics generics = fury.getGenerics();
     GenericType genericType = generics.nextGenericType();
@@ -1228,55 +668,61 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
       if (!keyGenericType.hasGenericParameters() && 
!valueGenericType.hasGenericParameters()) {
         for (Object object : value.entrySet()) {
           Map.Entry entry = (Map.Entry) object;
-          fury.xwriteRefByNullableSerializer(buffer, entry.getKey(), 
keySerializer);
-          fury.xwriteRefByNullableSerializer(buffer, entry.getValue(), 
valueSerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getKey(), 
keySerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getValue(), 
valueSerializer);
         }
       } else if (valueGenericType.hasGenericParameters()) {
         for (Object object : value.entrySet()) {
           Map.Entry entry = (Map.Entry) object;
-          fury.xwriteRefByNullableSerializer(buffer, entry.getKey(), 
keySerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getKey(), 
keySerializer);
           generics.pushGenericType(valueGenericType);
-          fury.xwriteRefByNullableSerializer(buffer, entry.getValue(), 
valueSerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getValue(), 
valueSerializer);
           generics.popGenericType();
         }
       } else if (keyGenericType.hasGenericParameters()) {
         for (Object object : value.entrySet()) {
           Map.Entry entry = (Map.Entry) object;
           generics.pushGenericType(keyGenericType);
-          fury.xwriteRefByNullableSerializer(buffer, entry.getKey(), 
keySerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getKey(), 
keySerializer);
           generics.popGenericType();
-          fury.xwriteRefByNullableSerializer(buffer, entry.getValue(), 
valueSerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getValue(), 
valueSerializer);
         }
       } else {
         for (Object object : value.entrySet()) {
           Map.Entry entry = (Map.Entry) object;
           generics.pushGenericType(keyGenericType);
-          fury.xwriteRefByNullableSerializer(buffer, entry.getKey(), 
keySerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getKey(), 
keySerializer);
           generics.pushGenericType(valueGenericType);
-          fury.xwriteRefByNullableSerializer(buffer, entry.getValue(), 
valueSerializer);
+          xwriteRefByNullableSerializer(fury, buffer, entry.getValue(), 
valueSerializer);
         }
       }
       generics.popGenericType();
     }
   }
 
-  private Tuple2<GenericType, GenericType> getKVGenericType(GenericType 
genericType) {
-    Tuple2<GenericType, GenericType> genericTypes = 
partialGenericKVTypeMap.get(genericType);
-    if (genericTypes == null) {
+  public static <T> void xwriteRefByNullableSerializer(
+      Fury fury, MemoryBuffer buffer, T obj, Serializer<T> serializer) {
+    if (serializer == null) {
+      fury.xwriteRef(buffer, obj);
+    } else {
+      fury.xwriteRef(buffer, obj, serializer);
+    }
+  }
+
+  private GenericType getKVGenericType(GenericType genericType) {
+    GenericType mapGenericType = partialGenericKVTypeMap.get(genericType);
+    if (mapGenericType == null) {
       TypeRef<?> typeRef = genericType.getTypeRef();
       if (!MAP_TYPE.isSupertypeOf(typeRef)) {
-        Tuple2<GenericType, GenericType> typeTuple = Tuple2.of(objType, 
objType);
-        partialGenericKVTypeMap.put(genericType, typeTuple);
-        return typeTuple;
+        mapGenericType = GenericType.build(TypeUtils.mapOf(Object.class, 
Object.class));
+        partialGenericKVTypeMap.put(genericType, mapGenericType);
+        return mapGenericType;
       }
       Tuple2<TypeRef<?>, TypeRef<?>> mapKeyValueType = 
TypeUtils.getMapKeyValueType(typeRef);
-      genericTypes =
-          Tuple2.of(
-              
fury.getClassResolver().buildGenericType(mapKeyValueType.f0.getType()),
-              
fury.getClassResolver().buildGenericType(mapKeyValueType.f1.getType()));
-      partialGenericKVTypeMap.put(genericType, genericTypes);
+      mapGenericType = GenericType.build(TypeUtils.mapOf(mapKeyValueType.f0, 
mapKeyValueType.f1));
+      partialGenericKVTypeMap.put(genericType, mapGenericType);
     }
-    return genericTypes;
+    return mapGenericType;
   }
 
   @Override
@@ -1354,7 +800,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  @SuppressWarnings("unchecked")
   protected final void chunkReadElements(MemoryBuffer buffer, int size, Map 
map) {
     Serializer keySerializer = this.keySerializer;
     Serializer valueSerializer = this.valueSerializer;
@@ -1363,15 +808,178 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     // TODO use generics for compatible serializer.
     this.keySerializer = null;
     this.valueSerializer = null;
-    if (keySerializer != null && valueSerializer != null) {
-      javaChunkReadWithKVSerializers(buffer, map, size, keySerializer, 
valueSerializer);
-    } else if (keySerializer != null) {
-      javaChunkReadWithKeySerializer(buffer, map, size, keySerializer);
-    } else if (valueSerializer != null) {
-      javaChunkReadWithValueSerializer(buffer, map, size, valueSerializer);
+    if (size == 0) {
+      return;
+    }
+
+    int chunkHeader = buffer.readUnsignedByte();
+    while (size > 0) {
+      long sizeAndHeader =
+          readJavaNullChunk(buffer, map, chunkHeader, size, keySerializer, 
valueSerializer);
+      chunkHeader = (int) (sizeAndHeader & 0b11111111);
+      size = (int) (sizeAndHeader >>> 8);
+      if (keySerializer != null || valueSerializer != null) {
+        sizeAndHeader =
+            readJavaChunk(fury, buffer, map, size, chunkHeader, keySerializer, 
valueSerializer);
+      } else {
+        Generics generics = fury.getGenerics();
+        GenericType genericType = generics.nextGenericType();
+        if (genericType == null) {
+          sizeAndHeader = readJavaChunk(fury, buffer, map, size, chunkHeader, 
null, null);
+        } else {
+          sizeAndHeader =
+              readJavaChunkGeneric(fury, generics, genericType, buffer, map, 
size, chunkHeader);
+        }
+      }
+      chunkHeader = (int) (sizeAndHeader & 0xff);
+      size = (int) (sizeAndHeader >>> 8);
+    }
+  }
+
+  public long readJavaNullChunk(
+      MemoryBuffer buffer,
+      Map map,
+      int chunkHeader,
+      long size,
+      Serializer keySerializer,
+      Serializer valueSerializer) {
+    while (true) {
+      boolean keyHasNull = (chunkHeader & KEY_HAS_NULL) != 0;
+      boolean valueHasNull = (chunkHeader & VALUE_HAS_NULL) != 0;
+      if (!keyHasNull) {
+        if (!valueHasNull) {
+          return (size << 8) | chunkHeader;
+        } else {
+          boolean trackKeyRef = (chunkHeader & TRACKING_KEY_REF) != 0;
+          Object key;
+          if ((chunkHeader & KEY_DECL_TYPE) != 0) {
+            if (trackKeyRef) {
+              key = fury.readRef(buffer, keySerializer);
+            } else {
+              key = keySerializer.read(buffer);
+            }
+          } else {
+            key = fury.readRef(buffer, keyClassInfoReadCache);
+          }
+          map.put(key, null);
+        }
+      } else {
+        readNullKeyChunk(buffer, map, chunkHeader, valueSerializer, 
valueHasNull);
+      }
+      if (size-- == 0) {
+        return 0;
+      } else {
+        chunkHeader = buffer.readUnsignedByte();
+      }
+    }
+  }
+
+  /**
+   * Read chunk of size 1, the key is null. Since we can have at most one key 
whose value is null,
+   * this method is not in critical path, make it as a separate method to let 
caller eligible for
+   * jit inline.
+   */
+  private void readNullKeyChunk(
+      MemoryBuffer buffer,
+      Map map,
+      int chunkHeader,
+      Serializer valueSerializer,
+      boolean valueHasNull) {
+    if (!valueHasNull) {
+      Object value;
+      boolean trackValueRef = (chunkHeader & TRACKING_VALUE_REF) != 0;
+      if ((chunkHeader & VALUE_DECL_TYPE) != 0) {
+        if (trackValueRef) {
+          value = fury.readRef(buffer, valueSerializer);
+        } else {
+          value = valueSerializer.read(buffer);
+        }
+      } else {
+        value = fury.readRef(buffer, valueClassInfoReadCache);
+      }
+      map.put(null, value);
     } else {
-      genericJavaChunkRead(fury, buffer, map, size);
+      map.put(null, null);
+    }
+  }
+
+  private long readJavaChunk(
+      Fury fury,
+      MemoryBuffer buffer,
+      Map map,
+      long size,
+      int chunkHeader,
+      Serializer keySerializer,
+      Serializer valueSerializer) {
+    // noinspection Duplicates
+    boolean trackKeyRef = (chunkHeader & TRACKING_KEY_REF) != 0;
+    boolean trackValueRef = (chunkHeader & TRACKING_VALUE_REF) != 0;
+    boolean keyIsDeclaredType = (chunkHeader & KEY_DECL_TYPE) != 0;
+    boolean valueIsDeclaredType = (chunkHeader & VALUE_DECL_TYPE) != 0;
+    int chunkSize = buffer.readUnsignedByte();
+    if (!keyIsDeclaredType) {
+      keySerializer = classResolver.readClassInfo(buffer, 
keyClassInfoReadCache).getSerializer();
+    }
+    if (!valueIsDeclaredType) {
+      valueSerializer =
+          classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
+    }
+    for (int i = 0; i < chunkSize; i++) {
+      Object key = trackKeyRef ? fury.readRef(buffer, keySerializer) : 
keySerializer.read(buffer);
+      Object value =
+          trackValueRef ? fury.readRef(buffer, valueSerializer) : 
valueSerializer.read(buffer);
+      map.put(key, value);
+      size--;
     }
+    return size > 0 ? (size << 8) | buffer.readUnsignedByte() : 0;
+  }
+
+  private long readJavaChunkGeneric(
+      Fury fury,
+      Generics generics,
+      GenericType genericType,
+      MemoryBuffer buffer,
+      Map map,
+      long size,
+      int chunkHeader) {
+    // type parameters count for `Map field` will be 0;
+    // type parameters count for `SubMap<V> field` which SubMap is
+    // `SubMap<V> implements Map<String, V>` will be 1;
+    if (genericType.getTypeParametersCount() < 2) {
+      genericType = getKVGenericType(genericType);
+    }
+    GenericType keyGenericType = genericType.getTypeParameter0();
+    GenericType valueGenericType = genericType.getTypeParameter1();
+    // noinspection Duplicates
+    boolean trackKeyRef = (chunkHeader & TRACKING_KEY_REF) != 0;
+    boolean trackValueRef = (chunkHeader & TRACKING_VALUE_REF) != 0;
+    boolean keyIsDeclaredType = (chunkHeader & KEY_DECL_TYPE) != 0;
+    boolean valueIsDeclaredType = (chunkHeader & VALUE_DECL_TYPE) != 0;
+    int chunkSize = buffer.readUnsignedByte();
+    Serializer keySerializer, valueSerializer;
+    if (!keyIsDeclaredType) {
+      keySerializer = classResolver.readClassInfo(buffer, 
keyClassInfoReadCache).getSerializer();
+    } else {
+      keySerializer = keyGenericType.getSerializer(classResolver);
+    }
+    if (!valueIsDeclaredType) {
+      valueSerializer =
+          classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
+    } else {
+      valueSerializer = valueGenericType.getSerializer(classResolver);
+    }
+    for (int i = 0; i < chunkSize; i++) {
+      generics.pushGenericType(keyGenericType);
+      Object key = trackKeyRef ? fury.readRef(buffer, keySerializer) : 
keySerializer.read(buffer);
+      generics.popGenericType();
+      generics.pushGenericType(valueGenericType);
+      Object value =
+          trackValueRef ? fury.readRef(buffer, valueSerializer) : 
valueSerializer.read(buffer);
+      generics.popGenericType();
+      map.put(key, value);
+      size--;
+    }
+    return size > 0 ? (size << 8) | buffer.readUnsignedByte() : 0;
   }
 
   @SuppressWarnings("unchecked")
@@ -1411,16 +1019,14 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     if (genericType == null) {
       generalJavaRead(fury, buffer, map, size);
     } else {
+      if (genericType.getTypeParametersCount() < 2) {
+        genericType = getKVGenericType(genericType);
+      }
       GenericType keyGenericType = genericType.getTypeParameter0();
       GenericType valueGenericType = genericType.getTypeParameter1();
-      if (genericType.getTypeParametersCount() < 2) {
-        Tuple2<GenericType, GenericType> kvGenericType = 
getKVGenericType(genericType);
-        if (keyGenericType == objType && valueGenericType == objType) {
-          generalJavaRead(fury, buffer, map, size);
-          return;
-        }
-        keyGenericType = kvGenericType.f0;
-        valueGenericType = kvGenericType.f1;
+      if (keyGenericType == objType && valueGenericType == objType) {
+        generalJavaRead(fury, buffer, map, size);
+        return;
       }
       boolean keyGenericTypeFinal = keyGenericType.isMonomorphic();
       boolean valueGenericTypeFinal = valueGenericType.isMonomorphic();
@@ -1438,253 +1044,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void javaChunkReadWithKeySerializer(
-      MemoryBuffer buffer, Map map, int size, Serializer keySerializer) {
-    final ClassResolver classResolver = fury.getClassResolver();
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      byte header = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      Serializer valueReadSerializer = null;
-      for (byte i = 0; i < chunkSize; i++) {
-        Object key;
-        Object value;
-        key = readFinalKey(buffer, header, keySerializer);
-        if (!fury.trackingRef()) {
-          if (!valueIsDifferentType(header)) {
-            if (valueHasNull(header)) {
-              byte flag = buffer.readByte();
-              if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = valueReadSerializer.read(buffer);
-              } else {
-                value = null;
-              }
-            } else {
-              if (valueReadSerializer == null) {
-                valueReadSerializer =
-                    classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-              }
-              value = valueReadSerializer.read(buffer);
-            }
-          } else {
-            value = fury.readNullable(buffer, valueClassInfoReadCache);
-          }
-
-        } else {
-          if (!valueIsDifferentType(header)) {
-            if (valueHasNull(header)) {
-              byte flag = buffer.readByte();
-              if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = fury.readRef(buffer, valueReadSerializer);
-              } else {
-                value = null;
-              }
-            } else {
-              if (valueReadSerializer == null) {
-                valueReadSerializer =
-                    classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-              }
-              value = readNoNullRef(valueReadSerializer, buffer);
-            }
-          } else {
-            value = fury.readRef(buffer, valueClassInfoReadCache);
-          }
-        }
-        map.put(key, value);
-        size--;
-      }
-    }
-  }
-
-  private void javaChunkReadWithValueSerializer(
-      MemoryBuffer buffer, Map map, int size, Serializer valueSerializer) {
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      byte header = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      Serializer keyReadSerializer = null;
-      for (byte i = 0; i < chunkSize; i++) {
-        Object key;
-        Object value;
-        if (!fury.trackingRef()) {
-          if (keyHasNull(header)) {
-            byte nullFlag = buffer.readByte();
-            Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-            key = null;
-          } else {
-            if (!keyIsDifferentType(header)) {
-              if (keyReadSerializer == null) {
-                keyReadSerializer =
-                    fury.getClassResolver()
-                        .readClassInfo(buffer, keyClassInfoReadCache)
-                        .getSerializer();
-              }
-              key = keyReadSerializer.read(buffer);
-            } else {
-              key = fury.readNonRef(buffer, keyClassInfoReadCache);
-            }
-          }
-        } else {
-          if (keyHasNull(header)) {
-            byte nullFlag = buffer.readByte();
-            Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-            key = null;
-          } else {
-            if (!keyIsDifferentType(header)) {
-              if (keyReadSerializer == null) {
-                keyReadSerializer =
-                    fury.getClassResolver()
-                        .readClassInfo(buffer, keyClassInfoReadCache)
-                        .getSerializer();
-              }
-              key = readNoNullRef(keyReadSerializer, buffer);
-            } else {
-              key = fury.readRef(buffer, keyClassInfoReadCache);
-            }
-          }
-        }
-        value = readFinalValue(buffer, header, valueSerializer);
-        map.put(key, value);
-        size--;
-      }
-    }
-  }
-
-  private void javaChunkReadWithKVSerializers(
-      MemoryBuffer buffer,
-      Map map,
-      int size,
-      Serializer keySerializer,
-      Serializer valueSerializer) {
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      byte header = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      for (byte i = 0; i < chunkSize; i++) {
-        Object key;
-        Object value;
-        key = readFinalKey(buffer, header, keySerializer);
-        value = readFinalValue(buffer, header, valueSerializer);
-        map.put(key, value);
-        size--;
-      }
-    }
-  }
-
-  public Object readFinalKey(MemoryBuffer buffer, int header, Serializer 
keySerializer) {
-    boolean trackingKeyRef = keySerializer.needToWriteRef();
-    if (!trackingKeyRef) {
-      if (keyHasNull(header)) {
-        byte nullFlag = buffer.readByte();
-        Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected 
NULL_FLAG");
-        return null;
-      } else {
-        return keySerializer.read(buffer);
-      }
-    } else {
-      return fury.readRef(buffer, keySerializer);
-    }
-  }
-
-  public Object readFinalValue(MemoryBuffer buffer, int header, Serializer 
valueSerializer) {
-    boolean trackingValueRef = valueSerializer.needToWriteRef();
-    if (!trackingValueRef) {
-      if (valueHasNull(header)) {
-        byte flag = buffer.readByte();
-        if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-          return valueSerializer.read(buffer);
-        } else {
-          return null;
-        }
-      } else {
-        return valueSerializer.read(buffer);
-      }
-    } else {
-      return fury.readRef(buffer, valueSerializer);
-    }
-  }
-
-  private void genericJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, 
int size) {
-    Generics generics = fury.getGenerics();
-    GenericType genericType = generics.nextGenericType();
-    if (genericType == null) {
-      generalJavaChunkRead(fury, buffer, map, size);
-    } else {
-      GenericType keyGenericType = genericType.getTypeParameter0();
-      GenericType valueGenericType = genericType.getTypeParameter1();
-      if (genericType.getTypeParametersCount() < 2) {
-        Tuple2<GenericType, GenericType> kvGenericType = 
getKVGenericType(genericType);
-        if (keyGenericType == objType && valueGenericType == objType) {
-          generalJavaChunkRead(fury, buffer, map, size);
-          return;
-        }
-        keyGenericType = kvGenericType.f0;
-        valueGenericType = kvGenericType.f1;
-      }
-      boolean keyGenericTypeFinal = keyGenericType.isMonomorphic();
-      boolean valueGenericTypeFinal = valueGenericType.isMonomorphic();
-      if (keyGenericTypeFinal && valueGenericTypeFinal) {
-        javaKVTypesFinalChunkRead(
-            fury, buffer, map, keyGenericType, valueGenericType, generics, 
size);
-      } else if (keyGenericTypeFinal) {
-        javaKeyTypeFinalChunkRead(
-            fury, buffer, map, keyGenericType, valueGenericType, generics, 
size);
-      } else if (valueGenericTypeFinal) {
-        javaValueTypeFinalChunkRead(
-            fury, buffer, map, keyGenericType, valueGenericType, generics, 
size);
-      } else {
-        javaKVTypesNonFinalChunkRead(
-            fury, buffer, map, keyGenericType, valueGenericType, generics, 
size);
-      }
-      generics.popGenericType();
-    }
-  }
-
-  private void javaKVTypesFinalChunkRead(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics,
-      int size) {
-    Serializer keySerializer = 
keyGenericType.getSerializer(fury.getClassResolver());
-    Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      byte header = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      for (byte i = 0; i < chunkSize; i++) {
-        Object key;
-        Object value;
-        generics.pushGenericType(keyGenericType);
-        key = readFinalKey(buffer, header, keySerializer);
-        generics.popGenericType();
-        generics.pushGenericType(valueGenericType);
-        value = readFinalValue(buffer, header, valueSerializer);
-        generics.popGenericType();
-        map.put(key, value);
-        size--;
-      }
-    }
-  }
-
   private void javaKVTypesFinalRead(
       Fury fury,
       MemoryBuffer buffer,
@@ -1706,86 +1065,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void javaKeyTypeFinalChunkRead(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics,
-      int size) {
-    ClassResolver classResolver = fury.getClassResolver();
-    boolean trackingValueRef = 
classResolver.needToWriteRef(valueGenericType.getCls());
-    Serializer keySerializer = 
keyGenericType.getSerializer(fury.getClassResolver());
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      byte header = buffer.readByte();
-      Serializer valueReadSerializer = null;
-      while (chunkSize > 0) {
-        generics.pushGenericType(keyGenericType);
-        Object key = readFinalKey(buffer, header, keySerializer);
-        generics.popGenericType();
-        generics.pushGenericType(valueGenericType);
-        Object value;
-        if (!trackingValueRef) {
-          if (!valueIsDifferentType(header)) {
-            if (valueHasNull(header)) {
-              byte flag = buffer.readByte();
-              if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = valueReadSerializer.read(buffer);
-              } else {
-                value = null;
-              }
-            } else {
-              if (valueReadSerializer == null) {
-                valueReadSerializer =
-                    classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-              }
-              value = valueReadSerializer.read(buffer);
-            }
-          } else {
-            value = fury.readNullable(buffer, valueClassInfoReadCache);
-          }
-
-        } else {
-          if (!valueIsDifferentType(header)) {
-            if (valueHasNull(header)) {
-              byte flag = buffer.readByte();
-              if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = fury.readRef(buffer, valueReadSerializer);
-              } else {
-                value = null;
-              }
-            } else {
-              if (valueReadSerializer == null) {
-                valueReadSerializer =
-                    classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-              }
-              value = readNoNullRef(valueReadSerializer, buffer);
-            }
-          } else {
-            value = fury.readRef(buffer, valueClassInfoReadCache);
-          }
-        }
-        generics.popGenericType();
-        chunkSize--;
-        size--;
-        map.put(key, value);
-      }
-    }
-  }
-
   private void javaKeyTypeFinalRead(
       Fury fury,
       MemoryBuffer buffer,
@@ -1810,74 +1089,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void javaValueTypeFinalChunkRead(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics,
-      int size) {
-    boolean trackingKeyRef = 
fury.getClassResolver().needToWriteRef(keyGenericType.getCls());
-    Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      byte header = buffer.readByte();
-      Serializer keyReadSerializer = null;
-      while (chunkSize > 0) {
-        generics.pushGenericType(keyGenericType);
-        Object key;
-        if (!trackingKeyRef) {
-          if (keyHasNull(header)) {
-            byte nullFlag = buffer.readByte();
-            Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-            key = null;
-          } else {
-            if (!keyIsDifferentType(header)) {
-              if (keyReadSerializer == null) {
-                keyReadSerializer =
-                    fury.getClassResolver()
-                        .readClassInfo(buffer, keyClassInfoReadCache)
-                        .getSerializer();
-              }
-              key = keyReadSerializer.read(buffer);
-            } else {
-              key = fury.readNonRef(buffer, keyClassInfoReadCache);
-            }
-          }
-        } else {
-          if (keyHasNull(header)) {
-            byte nullFlag = buffer.readByte();
-            Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-            key = null;
-          } else {
-            if (!keyIsDifferentType(header)) {
-              if (keyReadSerializer == null) {
-                keyReadSerializer =
-                    fury.getClassResolver()
-                        .readClassInfo(buffer, keyClassInfoReadCache)
-                        .getSerializer();
-              }
-              key = readNoNullRef(keyReadSerializer, buffer);
-            } else {
-              key = fury.readRef(buffer, keyClassInfoReadCache);
-            }
-          }
-        }
-        generics.popGenericType();
-        generics.pushGenericType(valueGenericType);
-        Object value = readFinalValue(buffer, header, valueSerializer);
-        generics.popGenericType();
-        chunkSize--;
-        size--;
-        map.put(key, value);
-      }
-    }
-  }
-
   private void javaValueTypeFinalRead(
       Fury fury,
       MemoryBuffer buffer,
@@ -1901,142 +1112,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void javaKVTypesNonFinalChunkRead(
-      Fury fury,
-      MemoryBuffer buffer,
-      Map map,
-      GenericType keyGenericType,
-      GenericType valueGenericType,
-      Generics generics,
-      int size) {
-    ClassResolver classResolver = fury.getClassResolver();
-    RefResolver refResolver = fury.getRefResolver();
-    boolean trackingKeyRef = 
classResolver.needToWriteRef(keyGenericType.getCls());
-    boolean trackingValueRef = 
classResolver.needToWriteRef(valueGenericType.getCls());
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      if (chunkSize == 0) {
-        while (size > 0) {
-          generics.pushGenericType(keyGenericType);
-          Object key =
-              readJavaRefOptimized(
-                  fury, refResolver, trackingKeyRef, buffer, 
keyClassInfoReadCache);
-          generics.popGenericType();
-          generics.pushGenericType(valueGenericType);
-          Object value =
-              readJavaRefOptimized(
-                  fury, refResolver, trackingValueRef, buffer, 
valueClassInfoReadCache);
-          generics.popGenericType();
-          map.put(key, value);
-          size--;
-        }
-      } else {
-        byte header = buffer.readByte();
-        Serializer keyReadSerializer = null;
-        Serializer valueReadSerializer = null;
-        while (chunkSize > 0) {
-          generics.pushGenericType(keyGenericType);
-          Object key;
-          if (!trackingKeyRef) {
-            if (keyHasNull(header)) {
-              byte nullFlag = buffer.readByte();
-              Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-              key = null;
-            } else {
-              if (!keyIsDifferentType(header)) {
-                if (keyReadSerializer == null) {
-                  keyReadSerializer =
-                      classResolver.readClassInfo(buffer, 
keyClassInfoReadCache).getSerializer();
-                }
-                key = keyReadSerializer.read(buffer);
-              } else {
-                key = fury.readNonRef(buffer, keyClassInfoReadCache);
-              }
-            }
-          } else {
-            if (keyHasNull(header)) {
-              byte nullFlag = buffer.readByte();
-              Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-              key = null;
-            } else {
-              if (!keyIsDifferentType(header)) {
-                if (keyReadSerializer == null) {
-                  keyReadSerializer =
-                      classResolver.readClassInfo(buffer, 
keyClassInfoReadCache).getSerializer();
-                }
-                key = readNoNullRef(keyReadSerializer, buffer);
-              } else {
-                key = fury.readRef(buffer, keyClassInfoReadCache);
-              }
-            }
-          }
-          generics.popGenericType();
-          generics.pushGenericType(valueGenericType);
-          Object value;
-          if (!trackingValueRef) {
-            if (!valueIsDifferentType(header)) {
-              if (valueHasNull(header)) {
-                byte flag = buffer.readByte();
-                if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                  if (valueReadSerializer == null) {
-                    valueReadSerializer =
-                        classResolver
-                            .readClassInfo(buffer, valueClassInfoReadCache)
-                            .getSerializer();
-                  }
-                  value = valueReadSerializer.read(buffer);
-                } else {
-                  value = null;
-                }
-              } else {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = valueReadSerializer.read(buffer);
-              }
-            } else {
-              value = fury.readNullable(buffer, valueClassInfoReadCache);
-            }
-
-          } else {
-            if (!valueIsDifferentType(header)) {
-              if (valueHasNull(header)) {
-                byte flag = buffer.readByte();
-                if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                  if (valueReadSerializer == null) {
-                    valueReadSerializer =
-                        classResolver
-                            .readClassInfo(buffer, valueClassInfoReadCache)
-                            .getSerializer();
-                  }
-                  value = fury.readRef(buffer, valueReadSerializer);
-                } else {
-                  value = null;
-                }
-              } else {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = readNoNullRef(valueReadSerializer, buffer);
-              }
-            } else {
-              value = fury.readRef(buffer, valueClassInfoReadCache);
-            }
-          }
-          generics.popGenericType();
-          chunkSize--;
-          size--;
-          map.put(key, value);
-        }
-      }
-    }
-  }
-
   private void javaKVTypesNonFinalRead(
       Fury fury,
       MemoryBuffer buffer,
@@ -2063,149 +1138,6 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
-  private void generalJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, 
int size) {
-    ClassResolver classResolver = fury.getClassResolver();
-    boolean trackingRef = fury.trackingRef();
-    while (size > 0) {
-      byte chunkSize = buffer.readByte();
-      Preconditions.checkArgument(
-          chunkSize >= 0,
-          "chunkSize < 0, which means serialization protocol is not same with 
deserialization protocol");
-      if (chunkSize == 0) {
-        while (size > 0) {
-          Object key = fury.readRef(buffer, keyClassInfoReadCache);
-          Object value = fury.readRef(buffer, keyClassInfoReadCache);
-          map.put(key, value);
-          size--;
-        }
-      } else {
-        byte header = buffer.readByte();
-        Serializer keyReadSerializer = null;
-        Serializer valueReadSerializer = null;
-        while (chunkSize > 0) {
-          Object key;
-          if (!trackingRef) {
-            if (keyHasNull(header)) {
-              byte nullFlag = buffer.readByte();
-              Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-              key = null;
-            } else {
-              if (!keyIsDifferentType(header)) {
-                if (keyReadSerializer == null) {
-                  keyReadSerializer =
-                      classResolver.readClassInfo(buffer, 
keyClassInfoReadCache).getSerializer();
-                }
-                key = keyReadSerializer.read(buffer);
-              } else {
-                key = fury.readNonRef(buffer, keyClassInfoReadCache);
-              }
-            }
-          } else {
-            if (keyHasNull(header)) {
-              byte nullFlag = buffer.readByte();
-              Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, 
"unexpected error");
-              key = null;
-            } else {
-              if (!keyIsDifferentType(header)) {
-                if (keyReadSerializer == null) {
-                  keyReadSerializer =
-                      classResolver.readClassInfo(buffer, 
keyClassInfoReadCache).getSerializer();
-                }
-                key = readNoNullRef(keyReadSerializer, buffer);
-              } else {
-                key = fury.readRef(buffer, keyClassInfoReadCache);
-              }
-            }
-          }
-          Object value;
-          if (!trackingRef) {
-            if (!valueIsDifferentType(header)) {
-              if (valueHasNull(header)) {
-                byte flag = buffer.readByte();
-                if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                  if (valueReadSerializer == null) {
-                    valueReadSerializer =
-                        classResolver
-                            .readClassInfo(buffer, valueClassInfoReadCache)
-                            .getSerializer();
-                  }
-                  value = valueReadSerializer.read(buffer);
-                } else {
-                  value = null;
-                }
-              } else {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = valueReadSerializer.read(buffer);
-              }
-            } else {
-              value = fury.readNullable(buffer, valueClassInfoReadCache);
-            }
-
-          } else {
-            if (!valueIsDifferentType(header)) {
-              if (valueHasNull(header)) {
-                byte flag = buffer.readByte();
-                if (flag == Fury.NOT_NULL_VALUE_FLAG) {
-                  if (valueReadSerializer == null) {
-                    valueReadSerializer =
-                        classResolver
-                            .readClassInfo(buffer, valueClassInfoReadCache)
-                            .getSerializer();
-                  }
-                  value = fury.readRef(buffer, valueReadSerializer);
-                } else {
-                  value = null;
-                }
-              } else {
-                if (valueReadSerializer == null) {
-                  valueReadSerializer =
-                      classResolver.readClassInfo(buffer, 
valueClassInfoReadCache).getSerializer();
-                }
-                value = readNoNullRef(valueReadSerializer, buffer);
-              }
-            } else {
-              value = fury.readRef(buffer, valueClassInfoReadCache);
-            }
-          }
-          chunkSize--;
-          size--;
-          map.put(key, value);
-        }
-      }
-    }
-  }
-
-  private boolean keyHasNull(int header) {
-    return (header & MapFlags.KEY_HAS_NULL) == MapFlags.KEY_HAS_NULL;
-  }
-
-  private boolean keyIsDifferentType(int header) {
-    return (header & MapFlags.KEY_NOT_SAME_TYPE) == MapFlags.KEY_NOT_SAME_TYPE;
-  }
-
-  private boolean valueIsDifferentType(int header) {
-    return (header & MapFlags.VALUE_NOT_SAME_TYPE) == 
MapFlags.VALUE_NOT_SAME_TYPE;
-  }
-
-  private Object readNoNullRef(Serializer serializer, MemoryBuffer 
memoryBuffer) {
-    if (serializer.needToWriteRef()) {
-      final RefResolver refResolver = fury.getRefResolver();
-      int nextReadRefId = refResolver.tryPreserveRefId(memoryBuffer);
-      if (nextReadRefId >= Fury.NOT_NULL_VALUE_FLAG) {
-        Object obj = serializer.read(memoryBuffer);
-        refResolver.setReadObject(nextReadRefId, obj);
-        return obj;
-      } else {
-        return refResolver.getReadObject();
-      }
-    } else {
-      return serializer.read(memoryBuffer);
-    }
-  }
-
   private void generalJavaRead(Fury fury, MemoryBuffer buffer, Map map, int 
size) {
     for (int i = 0; i < size; i++) {
       Object key = fury.readRef(buffer, keyClassInfoReadCache);
@@ -2232,33 +1164,33 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
       Serializer valueSerializer = 
valueGenericType.getSerializer(fury.getClassResolver());
       if (!keyGenericType.hasGenericParameters() && 
!valueGenericType.hasGenericParameters()) {
         for (int i = 0; i < size; i++) {
-          Object key = fury.xreadRefByNullableSerializer(buffer, 
keySerializer);
-          Object value = fury.xreadRefByNullableSerializer(buffer, 
valueSerializer);
+          Object key = xreadRefByNullableSerializer(fury, buffer, 
keySerializer);
+          Object value = xreadRefByNullableSerializer(fury, buffer, 
valueSerializer);
           map.put(key, value);
         }
       } else if (valueGenericType.hasGenericParameters()) {
         for (int i = 0; i < size; i++) {
-          Object key = fury.xreadRefByNullableSerializer(buffer, 
keySerializer);
+          Object key = xreadRefByNullableSerializer(fury, buffer, 
keySerializer);
           generics.pushGenericType(valueGenericType);
-          Object value = fury.xreadRefByNullableSerializer(buffer, 
valueSerializer);
+          Object value = xreadRefByNullableSerializer(fury, buffer, 
valueSerializer);
           generics.popGenericType();
           map.put(key, value);
         }
       } else if (keyGenericType.hasGenericParameters()) {
         for (int i = 0; i < size; i++) {
           generics.pushGenericType(keyGenericType);
-          Object key = fury.xreadRefByNullableSerializer(buffer, 
keySerializer);
+          Object key = xreadRefByNullableSerializer(fury, buffer, 
keySerializer);
           generics.popGenericType();
-          Object value = fury.xreadRefByNullableSerializer(buffer, 
valueSerializer);
+          Object value = xreadRefByNullableSerializer(fury, buffer, 
valueSerializer);
           map.put(key, value);
         }
       } else {
         for (int i = 0; i < size; i++) {
           // FIXME(chaokunyang) nested generics may be get by mistake.
           generics.pushGenericType(keyGenericType);
-          Object key = fury.xreadRefByNullableSerializer(buffer, 
keySerializer);
+          Object key = xreadRefByNullableSerializer(fury, buffer, 
keySerializer);
           generics.pushGenericType(valueGenericType);
-          Object value = fury.xreadRefByNullableSerializer(buffer, 
valueSerializer);
+          Object value = xreadRefByNullableSerializer(fury, buffer, 
valueSerializer);
           map.put(key, value);
         }
       }
@@ -2266,6 +1198,15 @@ public abstract class AbstractMapSerializer<T> extends 
Serializer<T> {
     }
   }
 
+  public static Object xreadRefByNullableSerializer(
+      Fury fury, MemoryBuffer buffer, Serializer<?> serializer) {
+    if (serializer == null) {
+      return fury.xreadRef(buffer);
+    } else {
+      return fury.xreadRef(buffer, serializer);
+    }
+  }
+
   /**
    * Hook for java serialization codegen, read/write key/value by entrySet.
    *
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java
 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java
index 52ca2d21..d1e8da4c 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java
@@ -20,20 +20,14 @@
 package org.apache.fury.serializer.collection;
 
 public class MapFlags {
-
   /** Whether track key ref. */
-  public static int TRACKING_KEY_REF = 0b1;
+  public static int TRACKING_KEY_REF = 0b0;
 
   /** Whether key has null. */
   public static int KEY_HAS_NULL = 0b10;
 
-  //    /**
-  //     * Whether key is not declare type.
-  //     */
-  //    public static int KEY_NOT_DECL_TYPE = 0b100;
-
-  /** Whether keys type are different. */
-  public static int KEY_NOT_SAME_TYPE = 0b100;
+  /** Whether key is not declare type. */
+  public static int KEY_DECL_TYPE = 0b100;
 
   /** Whether track value ref. */
   public static int TRACKING_VALUE_REF = 0b1000;
@@ -42,8 +36,24 @@ public class MapFlags {
   public static int VALUE_HAS_NULL = 0b10000;
 
   /** Whether value is not declare type. */
-  //    public static int VALUE_NOT_DECL_TYPE = 0b1000000;
+  public static int VALUE_DECL_TYPE = 0b100000;
+
+  // When key or value is null that entry will be serialized as a new chunk 
with size 1.
+  // In such cases, chunk size will be skipped writing.
+  /** Both key and value are null. */
+  public static int KV_NULL = KEY_HAS_NULL | VALUE_HAS_NULL;
+
+  /** Key is null, value type is declared type, and ref tracking for value is 
disabled. */
+  public static int NULL_KEY_VALUE_DECL_TYPE = KEY_HAS_NULL | VALUE_DECL_TYPE;
+
+  /** Key is null, value type is declared type, and ref tracking for value is 
enabled. */
+  public static int NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF =
+      KEY_HAS_NULL | VALUE_DECL_TYPE | TRACKING_VALUE_REF;
+
+  /** Value is null, key type is declared type, and ref tracking for key is 
disabled. */
+  public static int NULL_VALUE_KEY_DECL_TYPE = VALUE_HAS_NULL | KEY_DECL_TYPE;
 
-  /** Whether values type are different. */
-  public static int VALUE_NOT_SAME_TYPE = 0b100000;
+  /** Value is null, key type is declared type, and ref tracking for key is 
enabled. */
+  public static int NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF =
+      VALUE_HAS_NULL | KEY_DECL_TYPE | TRACKING_VALUE_REF;
 }
diff --git 
a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java
 
b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java
index 240fc8a3..c1af2826 100644
--- 
a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java
+++ 
b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java
@@ -326,9 +326,20 @@ public class MapSerializersTest extends FuryTestBase {
     copyCheck(fury, obj);
   }
 
+  public static MapFields createBigMapFieldsObject() {
+    Map<String, Integer> map = new HashMap<>();
+    for (int i = 0; i < 1000; i++) {
+      map.put("k" + i, i);
+    }
+    return createMapFieldsObject(map);
+  }
+
   public static MapFields createMapFieldsObject() {
+    return createMapFieldsObject(ImmutableMap.of("k1", 1, "k2", 2));
+  }
+
+  public static MapFields createMapFieldsObject(Map<String, Integer> map) {
     MapFields obj = new MapFields();
-    Map<String, Integer> map = ImmutableMap.of("k1", 1, "k2", 2);
     obj.map = map;
     obj.map2 = new HashMap<>(map);
     obj.map3 = new HashMap<>(map);
@@ -608,16 +619,63 @@ public class MapSerializersTest extends FuryTestBase {
   }
 
   @Test(dataProvider = "referenceTrackingConfig")
-  public void testObjectKeyValueChunkSerializer(boolean 
referenceTrackingConfig) {
+  public void testObjectKeyValueChunk(boolean referenceTrackingConfig) {
     Fury fury = 
Fury.builder().withRefTracking(referenceTrackingConfig).build();
     final Map<Object, Object> differentKeyAndValueTypeMap = 
createDifferentKeyAndValueTypeMap();
     final Serializer<? extends Map> serializer =
         fury.getSerializer(differentKeyAndValueTypeMap.getClass());
     MapSerializers.HashMapSerializer mapSerializer = 
(MapSerializers.HashMapSerializer) serializer;
     mapSerializer.setUseChunkSerialize(true);
-    final byte[] serialize = fury.serialize(differentKeyAndValueTypeMap);
-    final Object deserialize = fury.deserialize(serialize);
-    assertEquals(deserialize, differentKeyAndValueTypeMap);
+    serDeCheck(fury, differentKeyAndValueTypeMap);
+  }
+
+  @Test(dataProvider = "referenceTrackingConfig")
+  public void testObjectKeyValueBigChunk(boolean referenceTrackingConfig) {
+    Fury fury = 
Fury.builder().withRefTracking(referenceTrackingConfig).build();
+    final Map<Object, Object> differentKeyAndValueTypeMap = 
createDifferentKeyAndValueTypeMap();
+    for (int i = 0; i < 3000; i++) {
+      differentKeyAndValueTypeMap.put("k" + i, i);
+    }
+    final Serializer<? extends Map> serializer =
+        fury.getSerializer(differentKeyAndValueTypeMap.getClass());
+    MapSerializers.HashMapSerializer mapSerializer = 
(MapSerializers.HashMapSerializer) serializer;
+    mapSerializer.setUseChunkSerialize(true);
+    serDeCheck(fury, differentKeyAndValueTypeMap);
+  }
+
+  @Test
+  public void testMapChunkRefTracking() {
+    Fury fury =
+        Fury.builder()
+            .withRefTracking(true)
+            .withCodegen(false)
+            .requireClassRegistration(false)
+            .build();
+    Map<String, Integer> map = new HashMap<>();
+    for (int i = 0; i < 1; i++) {
+      map.put("k" + i, i);
+    }
+    Object v = ofArrayList(map, ofHashMap("k1", map, "k2", new HashMap<>(map), 
"k3", map));
+    serDeCheck(fury, v);
+  }
+
+  @Test
+  public void testMapChunkRefTrackingGenerics() {
+    Fury fury =
+        Fury.builder()
+            .withRefTracking(true)
+            .withCodegen(false)
+            .requireClassRegistration(false)
+            .build();
+
+    MapFields obj = new MapFields();
+    Map<String, Integer> map = new HashMap<>();
+    for (int i = 0; i < 1; i++) {
+      map.put("k" + i, i);
+    }
+    obj.map = map;
+    obj.mapKeyFinal = ofHashMap("k1", map);
+    serDeCheck(fury, obj);
   }
 
   @Test(dataProvider = "referenceTrackingConfig")
@@ -625,9 +683,10 @@ public class MapSerializersTest extends FuryTestBase {
     Fury fury =
         Fury.builder()
             .withRefTracking(referenceTrackingConfig)
+            .withCodegen(false)
             .requireClassRegistration(false)
             .build();
-    final MapFields mapFieldsObject = createMapFieldsObject();
+    final MapFields mapFieldsObject = createBigMapFieldsObject();
     // hashmap
     final Serializer<HashMap> serializer = fury.getSerializer(HashMap.class);
     MapSerializers.HashMapSerializer mapSerializer = 
(MapSerializers.HashMapSerializer) serializer;
@@ -663,9 +722,7 @@ public class MapSerializersTest extends FuryTestBase {
         (MapSerializers.EnumMapSerializer) serializer5;
     enumMapSerializer.setUseChunkSerialize(true);
 
-    final byte[] serialize = fury.serialize(mapFieldsObject);
-    final Object deserialize = fury.deserialize(serialize);
-    assertEquals(deserialize, mapFieldsObject);
+    serDeCheck(fury, mapFieldsObject);
   }
 
   private static Map<Object, Object> createDifferentKeyAndValueTypeMap() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to