This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a86dc90a [#1745] feat(remote merge): Introduce a common serializer. 
(#1916)
9a86dc90a is described below

commit 9a86dc90aba94db0649a32d4b361fe1c906ec195
Author: zhengchenyu <[email protected]>
AuthorDate: Wed Jul 17 19:06:44 2024 +0800

    [#1745] feat(remote merge): Introduce a common serializer. (#1916)
    
    ### What changes were proposed in this pull request?
    
    
    ### Why are the changes needed?
    
    Since merge needs to be performed on the ShuffleServer side, and 
serialization/deserialization may differ between versions of the compute 
framework. To avoid server-side processing of different versions of 
serialization/deserialization, unified serializer that is independent of the 
computing framework needs to be extracted.
    For now, only one serializers are supported, just WritableSerializer. 
WritableSerializer is used to handle the org.apache.hadoop.io.Writable 
interface, used in the MR and TEZ frameworks.
    
    > Note: KryoSerializer will be introduced when spark is supported.
    
    Fix: #1745
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test and test in cluster.
---
 .../apache/uniffle/common/config/RssBaseConf.java  |   8 +
 .../common/serializer/DeserializationStream.java   |  31 +++
 .../common/serializer/PartialInputStream.java      |  35 +++
 .../common/serializer/PartialInputStreamImpl.java  | 146 +++++++++++
 .../serializer/SeekableInMemoryByteChannel.java    | 165 ++++++++++++
 .../common/serializer/SerializationStream.java     |  31 +++
 .../uniffle/common/serializer/Serializer.java      |  25 ++
 .../common/serializer/SerializerFactory.java       |  70 ++++++
 .../common/serializer/SerializerInstance.java      |  36 +++
 .../writable/ComparativeOutputBuffer.java          |  42 ++++
 .../writable/RawWritableDeserializationStream.java |  79 ++++++
 .../writable/RawWritableSerializationStream.java   |  85 +++++++
 .../writable/WritableDeserializationStream.java    |  87 +++++++
 .../writable/WritableSerializationStream.java      |  88 +++++++
 .../serializer/writable/WritableSerializer.java    |  43 ++++
 .../writable/WritableSerializerInstance.java       |  67 +++++
 .../common/serializer/PartialInputStreamTest.java  | 199 +++++++++++++++
 .../common/serializer/SerializerFactoryTest.java   |  50 ++++
 .../uniffle/common/serializer/SerializerUtils.java | 128 ++++++++++
 .../common/serializer/WritableSerializerTest.java  | 277 +++++++++++++++++++++
 20 files changed, 1692 insertions(+)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index a0e6989ef..f6b99269c 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.StorageType;
 import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.serializer.writable.WritableSerializer;
 import org.apache.uniffle.common.util.RssUtils;
 
 public class RssBaseConf extends RssConf {
@@ -278,6 +279,13 @@ public class RssBaseConf extends RssConf {
           .defaultValue(16)
           .withDescription("start server service max retry");
 
+  /* Serialization */
+  public static final ConfigOption<String> RSS_IO_SERIALIZATIONS =
+      ConfigOptions.key("rss.io.serializations")
+          .stringType()
+          .defaultValue(WritableSerializer.class.getName())
+          .withDescription("Serializations are used for creative Serializers 
and Deserializers");
+
   public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> 
configOptions) {
     Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
     if (properties == null) {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/DeserializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/DeserializationStream.java
new file mode 100644
index 000000000..cdef70ba9
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/DeserializationStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.IOException;
+
+public abstract class DeserializationStream<K, V> {
+
+  public abstract boolean nextRecord() throws IOException;
+
+  public abstract K getCurrentKey() throws IOException;
+
+  public abstract V getCurrentValue() throws IOException;
+
+  public abstract void close() throws IOException;
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/PartialInputStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/PartialInputStream.java
new file mode 100644
index 000000000..331980216
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/PartialInputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/*
+ * PartialInputStream is a configurable partial input stream, which
+ * only allows reading from start to end of the source input stream.
+ * */
+public abstract class PartialInputStream extends InputStream {
+
+  @Override
+  public abstract int available() throws IOException;
+
+  public abstract long getStart();
+
+  public abstract long getEnd();
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/PartialInputStreamImpl.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/PartialInputStreamImpl.java
new file mode 100644
index 000000000..6bf8910fe
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/PartialInputStreamImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+
+/*
+ * PartialInputStream is a configurable partial input stream, which
+ * only allows reading from start to end of the source input stream.
+ * */
+public class PartialInputStreamImpl extends PartialInputStream {
+
+  private final SeekableByteChannel ch; // the source input channel
+  private final long start; // the start of source input stream
+  private final long end; // the end of source input stream
+  private long pos; // the read offset
+
+  private ByteBuffer bb = null;
+  private byte[] bs = null;
+  private byte[] b1;
+  private Closeable closeable;
+
+  public PartialInputStreamImpl(SeekableByteChannel ch, long start, long end, 
Closeable closeable)
+      throws IOException {
+    if (start < 0) {
+      throw new IOException("Negative position for channel!");
+    }
+    this.ch = ch;
+    this.start = start;
+    this.end = end;
+    this.closeable = closeable;
+    this.pos = start;
+    ch.position(start);
+  }
+
+  private int read(ByteBuffer bb) throws IOException {
+    return ch.read(bb);
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (b1 == null) {
+      b1 = new byte[1];
+    }
+    int n = read(b1);
+    if (n == 1) {
+      return b1[0] & 0xff;
+    }
+    return -1;
+  }
+
+  @Override
+  public synchronized int read(byte[] bs, int off, int len) throws IOException 
{
+    if ((off < 0)
+        || (off > bs.length)
+        || (len < 0)
+        || ((off + len) > bs.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+    ByteBuffer bb = (this.bs == bs) ? this.bb : ByteBuffer.wrap(bs);
+    bb.limit(Math.min(off + len, bb.capacity()));
+    bb.position(off);
+    this.bb = bb;
+    this.bs = bs;
+    int ret = read(bb);
+    if (ret >= 0) {
+      pos += ret;
+    }
+    return ret;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) (end - pos);
+  }
+
+  @Override
+  public long getStart() {
+    return start;
+  }
+
+  @Override
+  public long getEnd() {
+    return end;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closeable != null) {
+      closeable.close();
+    }
+  }
+
+  private static PartialInputStreamImpl newInputStream(
+      SeekableByteChannel ch, long start, long end, Closeable closeable) 
throws IOException {
+    if (ch == null) {
+      throw new NullPointerException("channel is null!");
+    }
+    return new PartialInputStreamImpl(ch, start, end, closeable);
+  }
+
+  public static PartialInputStreamImpl newInputStream(File file, long start, 
long end)
+      throws IOException {
+    FileInputStream input = new FileInputStream(file);
+    FileChannel fc = input.getChannel();
+    long size = fc.size();
+    return newInputStream(
+        fc,
+        start,
+        Math.min(end, size),
+        () -> {
+          input.close();
+        });
+  }
+
+  public static PartialInputStreamImpl newInputStream(byte[] bytes, long 
start, long end)
+      throws IOException {
+    SeekableInMemoryByteChannel ch = new SeekableInMemoryByteChannel(bytes);
+    int size = bytes.length;
+    return newInputStream(ch, start, Math.min(end, size), () -> ch.close());
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/SeekableInMemoryByteChannel.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/SeekableInMemoryByteChannel.java
new file mode 100644
index 000000000..cddd2d61a
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/SeekableInMemoryByteChannel.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SeekableInMemoryByteChannel implements SeekableByteChannel {
+
+  private static final int NAIVE_RESIZE_LIMIT = Integer.MAX_VALUE >> 1;
+
+  private byte[] data;
+  private final AtomicBoolean closed = new AtomicBoolean();
+  private int position;
+  private int size;
+
+  /**
+   * Constructor taking a byte array.
+   *
+   * <p>This constructor is intended to be used with pre-allocated buffer or 
when reading from a
+   * given byte array.
+   *
+   * @param data input data or pre-allocated array.
+   */
+  public SeekableInMemoryByteChannel(byte[] data) {
+    this.data = data;
+    size = data.length;
+  }
+
+  @Override
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public SeekableByteChannel position(long newPosition) throws IOException {
+    ensureOpen();
+    if (newPosition < 0L || newPosition > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException("Position has to be in range 0.. " + 
Integer.MAX_VALUE);
+    }
+    position = (int) newPosition;
+    return this;
+  }
+
+  @Override
+  public long size() {
+    return size;
+  }
+
+  @Override
+  public SeekableByteChannel truncate(long newSize) {
+    if (size > newSize) {
+      size = (int) newSize;
+    }
+    repositionIfNecessary();
+    return this;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    ensureOpen();
+    repositionIfNecessary();
+    int wanted = buf.remaining();
+    int possible = size - position;
+    if (possible <= 0) {
+      return -1;
+    }
+    if (wanted > possible) {
+      wanted = possible;
+    }
+    buf.put(data, position, wanted);
+    position += wanted;
+    return wanted;
+  }
+
+  @Override
+  public void close() {
+    closed.set(true);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return !closed.get();
+  }
+
+  @Override
+  public int write(ByteBuffer b) throws IOException {
+    ensureOpen();
+    int wanted = b.remaining();
+    int possibleWithoutResize = size - position;
+    if (wanted > possibleWithoutResize) {
+      int newSize = position + wanted;
+      if (newSize < 0) { // overflow
+        resize(Integer.MAX_VALUE);
+        wanted = Integer.MAX_VALUE - position;
+      } else {
+        resize(newSize);
+      }
+    }
+    b.get(data, position, wanted);
+    position += wanted;
+    if (size < position) {
+      size = position;
+    }
+    return wanted;
+  }
+
+  /**
+   * Obtains the array backing this channel.
+   *
+   * <p>NOTE: The returned buffer is not aligned with containing data, use 
{@link #size()} to obtain
+   * the size of data stored in the buffer.
+   *
+   * @return internal byte array.
+   */
+  public byte[] array() {
+    return data;
+  }
+
+  private void resize(int newLength) {
+    int len = data.length;
+    if (len <= 0) {
+      len = 1;
+    }
+    if (newLength < NAIVE_RESIZE_LIMIT) {
+      while (len < newLength) {
+        len <<= 1;
+      }
+    } else { // avoid overflow
+      len = newLength;
+    }
+    data = Arrays.copyOf(data, len);
+  }
+
+  private void ensureOpen() throws ClosedChannelException {
+    if (!isOpen()) {
+      throw new ClosedChannelException();
+    }
+  }
+
+  private void repositionIfNecessary() {
+    if (position > size) {
+      position = size;
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/SerializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/SerializationStream.java
new file mode 100644
index 000000000..421b53031
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/SerializationStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.IOException;
+
+public abstract class SerializationStream {
+
+  public abstract void writeRecord(Object key, Object value) throws 
IOException;
+
+  public abstract void flush() throws IOException;
+
+  public abstract void close() throws IOException;
+
+  public abstract long getTotalBytesWritten();
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/Serializer.java 
b/common/src/main/java/org/apache/uniffle/common/serializer/Serializer.java
new file mode 100644
index 000000000..0572d4146
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/serializer/Serializer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+public abstract class Serializer {
+
+  public abstract SerializerInstance newInstance();
+
+  public abstract boolean accept(Class<?> c);
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/SerializerFactory.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/SerializerFactory.java
new file mode 100644
index 000000000..d8c67cec8
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/SerializerFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+
+import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_IO_SERIALIZATIONS;
+
+public class SerializerFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SerializerFactory.class);
+
+  private List<Serializer> serializers = new ArrayList<Serializer>();
+
+  RssConf conf;
+
+  public SerializerFactory(RssConf conf) {
+    this.conf = conf;
+    for (String serializerName : 
StringUtils.split(conf.get(RSS_IO_SERIALIZATIONS), ",")) {
+      add(conf, serializerName);
+    }
+  }
+
+  private void add(RssConf conf, String serializerName) {
+    try {
+      Class<? extends Serializer> sClass =
+          (Class<? extends Serializer>) ClassUtils.getClass(serializerName);
+      Constructor<? extends Serializer> constructor = 
sClass.getConstructor(RssConf.class);
+      Serializer serializer = constructor.newInstance(conf);
+      serializers.add(serializer);
+    } catch (Exception e) {
+      LOG.warn("Construct Serialization fail, caused by ", e);
+      throw new RssException(e);
+    }
+  }
+
+  public Serializer getSerializer(Class c) {
+    for (Serializer serializer : serializers) {
+      if (serializer.accept(c)) {
+        return serializer;
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java
new file mode 100644
index 000000000..e079fb9a9
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.DataInputBuffer;
+
+public abstract class SerializerInstance {
+
+  public abstract <T> void serialize(T t, DataOutputStream out) throws 
IOException;
+
+  public abstract <T> T deserialize(DataInputBuffer buffer, Class vClass) 
throws IOException;
+
+  public abstract <K, V> SerializationStream serializeStream(OutputStream 
output, boolean raw);
+
+  public abstract <K, V> DeserializationStream deserializeStream(
+      PartialInputStream input, Class<K> keyClass, Class<V> valueClass, 
boolean raw);
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/ComparativeOutputBuffer.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/ComparativeOutputBuffer.java
new file mode 100644
index 000000000..ef6043c3e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/ComparativeOutputBuffer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class ComparativeOutputBuffer extends DataOutputBuffer {
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ComparativeOutputBuffer that = (ComparativeOutputBuffer) obj;
+    return Arrays.equals(this.getData(), that.getData());
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(this.getData());
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableDeserializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableDeserializationStream.java
new file mode 100644
index 000000000..54e4b01bf
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableDeserializationStream.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.PartialInputStream;
+
+public class RawWritableDeserializationStream<K extends Writable, V extends 
Writable>
+    extends DeserializationStream<ComparativeOutputBuffer, 
ComparativeOutputBuffer> {
+
+  public static final int EOF_MARKER = -1; // End of File Marker
+
+  private PartialInputStream inputStream;
+  private DataInputStream dataIn;
+  private ComparativeOutputBuffer currentKeyBuffer;
+  private ComparativeOutputBuffer currentValueBuffer;
+
+  public RawWritableDeserializationStream(
+      WritableSerializerInstance instance, PartialInputStream inputStream) {
+    this.inputStream = inputStream;
+    this.dataIn = new DataInputStream(inputStream);
+  }
+
+  @Override
+  public boolean nextRecord() throws IOException {
+    if (inputStream.available() <= 0) {
+      return false;
+    }
+    int currentKeyLength = WritableUtils.readVInt(dataIn);
+    int currentValueLength = WritableUtils.readVInt(dataIn);
+    if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
+      return false;
+    }
+    currentKeyBuffer = new ComparativeOutputBuffer();
+    currentValueBuffer = new ComparativeOutputBuffer();
+    currentKeyBuffer.write(dataIn, currentKeyLength);
+    currentValueBuffer.write(dataIn, currentValueLength);
+    return true;
+  }
+
+  @Override
+  public ComparativeOutputBuffer getCurrentKey() {
+    return currentKeyBuffer;
+  }
+
+  @Override
+  public ComparativeOutputBuffer getCurrentValue() {
+    return currentValueBuffer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (dataIn != null) {
+      dataIn.close();
+      dataIn = null;
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableSerializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableSerializationStream.java
new file mode 100644
index 000000000..e5fa2f1ef
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableSerializationStream.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.uniffle.common.serializer.SerializationStream;
+
+public class RawWritableSerializationStream<K, V> extends SerializationStream {
+
+  public static final int EOF_MARKER = -1; // End of File Marker
+
+  private DataOutputStream dataOut;
+  // DataOutputStream::size return int, can not support big file which is 
larger than
+  // Integer.MAX_VALUE.
+  // Here introduce totalBytesWritten to record the written bytes.
+  private long totalBytesWritten = 0;
+
+  public RawWritableSerializationStream(WritableSerializerInstance instance, 
OutputStream out) {
+    if (out instanceof DataOutputStream) {
+      dataOut = (DataOutputStream) out;
+    } else {
+      dataOut = new DataOutputStream(out);
+    }
+  }
+
+  @Override
+  public void writeRecord(Object key, Object value) throws IOException {
+    DataOutputBuffer keyBuffer = (DataOutputBuffer) key;
+    DataOutputBuffer valueBuffer = (DataOutputBuffer) value;
+    int keyLength = keyBuffer.getLength();
+    int valueLength = valueBuffer.getLength();
+    // write size and buffer to output
+    WritableUtils.writeVInt(dataOut, keyLength);
+    WritableUtils.writeVInt(dataOut, valueLength);
+    keyBuffer.writeTo(dataOut);
+    valueBuffer.writeTo(dataOut);
+    totalBytesWritten +=
+        WritableUtils.getVIntSize(keyLength)
+            + WritableUtils.getVIntSize(valueLength)
+            + keyBuffer.getLength()
+            + valueBuffer.getLength();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    dataOut.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (dataOut != null) {
+      WritableUtils.writeVInt(dataOut, EOF_MARKER);
+      WritableUtils.writeVInt(dataOut, EOF_MARKER);
+      totalBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+      dataOut.close();
+      dataOut = null;
+    }
+  }
+
+  @Override
+  public long getTotalBytesWritten() {
+    return totalBytesWritten;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableDeserializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableDeserializationStream.java
new file mode 100644
index 000000000..8b016fa27
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableDeserializationStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.PartialInputStream;
+
+public class WritableDeserializationStream<K extends Writable, V extends 
Writable>
+    extends DeserializationStream<K, V> {
+
+  public static final int EOF_MARKER = -1; // End of File Marker
+
+  private PartialInputStream inputStream;
+  private DataInputStream dataIn;
+  private Class<K> keyClass;
+  private Class<V> valueClass;
+  private K currentKey;
+  private V currentValue;
+
+  public WritableDeserializationStream(
+      WritableSerializerInstance instance,
+      PartialInputStream inputStream,
+      Class<K> keyClass,
+      Class<V> valueClass) {
+    this.inputStream = inputStream;
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.dataIn = new DataInputStream(inputStream);
+  }
+
+  @Override
+  public boolean nextRecord() throws IOException {
+    if (inputStream.available() <= 0) {
+      return false;
+    }
+    int currentKeyLength = WritableUtils.readVInt(dataIn);
+    int currentValueLength = WritableUtils.readVInt(dataIn);
+    if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
+      return false;
+    }
+    this.currentKey = ReflectionUtils.newInstance(keyClass, null);
+    this.currentKey.readFields(dataIn);
+    this.currentValue = ReflectionUtils.newInstance(valueClass, null);
+    this.currentValue.readFields(dataIn);
+    return true;
+  }
+
+  @Override
+  public K getCurrentKey() {
+    return currentKey;
+  }
+
+  @Override
+  public V getCurrentValue() {
+    return currentValue;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (dataIn != null) {
+      dataIn.close();
+      dataIn = null;
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializationStream.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializationStream.java
new file mode 100644
index 000000000..07ed07a14
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializationStream.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.uniffle.common.serializer.SerializationStream;
+
+public class WritableSerializationStream<K extends Writable, V extends 
Writable>
+    extends SerializationStream {
+
+  public static final int EOF_MARKER = -1; // End of File Marker
+
+  private DataOutputStream dataOut;
+  // DataOutputStream::size return int, can not support big file which is 
larger than
+  // Integer.MAX_VALUE.
+  // Here introduce totalBytesWritten to record the written bytes.
+  private long totalBytesWritten = 0;
+  DataOutputBuffer buffer = new DataOutputBuffer();
+  DataOutputBuffer sizebuffer = new DataOutputBuffer();
+
+  public WritableSerializationStream(WritableSerializerInstance instance, 
OutputStream out) {
+    if (out instanceof DataOutputStream) {
+      dataOut = (DataOutputStream) out;
+    } else {
+      dataOut = new DataOutputStream(out);
+    }
+  }
+
+  @Override
+  public void writeRecord(Object key, Object value) throws IOException {
+    // write key and value to buffer
+    buffer.reset();
+    ((Writable) key).write(buffer);
+    int keyLength = buffer.getLength();
+    ((Writable) value).write(buffer);
+    int valueLength = buffer.getLength() - keyLength;
+
+    // write size and buffer to output
+    sizebuffer.reset();
+    WritableUtils.writeVInt(sizebuffer, keyLength);
+    WritableUtils.writeVInt(sizebuffer, valueLength);
+    sizebuffer.writeTo(dataOut);
+    buffer.writeTo(dataOut);
+    totalBytesWritten += sizebuffer.getLength() + buffer.getLength();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    dataOut.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (dataOut != null) {
+      WritableUtils.writeVInt(dataOut, EOF_MARKER);
+      WritableUtils.writeVInt(dataOut, EOF_MARKER);
+      dataOut.close();
+      dataOut = null;
+    }
+  }
+
+  @Override
+  public long getTotalBytesWritten() {
+    return totalBytesWritten;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializer.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializer.java
new file mode 100644
index 000000000..2f195a20c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import org.apache.hadoop.io.Writable;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.Serializer;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+public class WritableSerializer extends Serializer {
+
+  private RssConf rssConf;
+
+  public WritableSerializer(RssConf rssConf) {
+    this.rssConf = rssConf;
+  }
+
+  @Override
+  public SerializerInstance newInstance() {
+    return new WritableSerializerInstance(rssConf);
+  }
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return Writable.class.isAssignableFrom(c);
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java
 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java
new file mode 100644
index 000000000..610a3463f
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer.writable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.PartialInputStream;
+import org.apache.uniffle.common.serializer.SerializationStream;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+public class WritableSerializerInstance extends SerializerInstance {
+
+  public WritableSerializerInstance(RssConf rssConf) {}
+
+  public <T> void serialize(T t, DataOutputStream out) throws IOException {
+    ((Writable) t).write(out);
+  }
+
+  @Override
+  public <T> T deserialize(DataInputBuffer buffer, Class vClass) throws 
IOException {
+    Writable writable = (Writable) ReflectionUtils.newInstance(vClass, null);
+    writable.readFields(buffer);
+    return (T) writable;
+  }
+
+  @Override
+  public <K, V> SerializationStream serializeStream(OutputStream output, 
boolean raw) {
+    if (raw) {
+      return new RawWritableSerializationStream(this, output);
+    } else {
+      return new WritableSerializationStream(this, output);
+    }
+  }
+
+  @Override
+  public <K, V> DeserializationStream deserializeStream(
+      PartialInputStream input, Class<K> keyClass, Class<V> valueClass, 
boolean raw) {
+    if (raw) {
+      return new RawWritableDeserializationStream(this, input);
+    } else {
+      return new WritableDeserializationStream(this, input, keyClass, 
valueClass);
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/PartialInputStreamTest.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/PartialInputStreamTest.java
new file mode 100644
index 000000000..16387828e
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/PartialInputStreamTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class PartialInputStreamTest {
+
+  private static final int BYTES_LEN = 10240;
+  private static byte[] testBuffer = new byte[BYTES_LEN];
+  private static final int LOOP = 10;
+  @TempDir private static File tempDir;
+  private static File tempFile;
+
+  @BeforeAll
+  public static void initData() throws IOException {
+    for (int i = 0; i < BYTES_LEN; i++) {
+      testBuffer[i] = (byte) (i & 0x7F);
+    }
+    tempFile = new File(tempDir, "data");
+    FileOutputStream output = new FileOutputStream(tempFile);
+    output.write(testBuffer);
+    output.close();
+  }
+
+  @Test
+  public void testReadMemroyInputStream() throws IOException {
+    // 1 test whole file
+    testRandomReadMemory(testBuffer, 0, testBuffer.length);
+
+    // 2 test from start to random end
+    Random random = new Random();
+    for (int i = 0; i < LOOP; i++) {
+      testRandomReadMemory(testBuffer, 0, random.nextInt(testBuffer.length - 
1));
+    }
+
+    // 3 test from random start to end
+    for (int i = 0; i < LOOP; i++) {
+      testRandomReadMemory(testBuffer, random.nextInt(testBuffer.length - 1), 
testBuffer.length);
+    }
+
+    // 4 test from random start to random end
+    for (int i = 0; i < LOOP; i++) {
+      int r1 = random.nextInt(testBuffer.length - 2) + 1;
+      int r2 = random.nextInt(testBuffer.length - 2) + 1;
+      testRandomReadMemory(testBuffer, Math.min(r1, r2), Math.max(r1, r2));
+    }
+
+    // 5 Test when bytes is from start to start
+    testRandomReadMemory(testBuffer, 0, 0);
+
+    // 6 Test when bytes is from end to end
+    testRandomReadMemory(testBuffer, testBuffer.length, testBuffer.length);
+
+    // 7 Test when bytes is from random to this random
+    for (int i = 0; i < LOOP; i++) {
+      int r = random.nextInt(testBuffer.length - 2) + 1;
+      testRandomReadMemory(testBuffer, r, r);
+    }
+  }
+
+  @Test
+  public void testReadNullBytes() throws IOException {
+    byte[] bytes = new byte[BYTES_LEN];
+    for (int i = 0; i < BYTES_LEN; i++) {
+      bytes[i] = (byte) (i & 0x7F);
+    }
+
+    // Test when bytes is byte[0]
+    PartialInputStreamImpl input =
+        PartialInputStreamImpl.newInputStream(new byte[0], 0, bytes.length);
+    assertEquals(0, input.available());
+    assertEquals(-1, input.read());
+    input.close();
+  }
+
+  @Test
+  public void testReadFileInputStream() throws IOException {
+    // 1 test whole file
+    testRandomReadFile(tempFile, 0, testBuffer.length);
+
+    // 2 test from start to random end
+    Random random = new Random();
+    for (int i = 0; i < LOOP; i++) {
+      testRandomReadFile(tempFile, 0, random.nextInt(testBuffer.length - 1));
+    }
+
+    // 3 test from random start to end
+    for (int i = 0; i < LOOP; i++) {
+      testRandomReadFile(tempFile, random.nextInt(testBuffer.length - 1), 
testBuffer.length);
+    }
+
+    // 4 test from random start to random end
+    for (int i = 0; i < LOOP; i++) {
+      int r1 = random.nextInt(testBuffer.length - 2) + 1;
+      int r2 = random.nextInt(testBuffer.length - 2) + 1;
+      testRandomReadFile(tempFile, Math.min(r1, r2), Math.max(r1, r2));
+    }
+
+    // 5 Test when bytes is from start to start
+    testRandomReadFile(tempFile, 0, 0);
+
+    // 6 Test when bytes is from end to end
+    testRandomReadFile(tempFile, testBuffer.length, testBuffer.length);
+
+    // 7 Test when bytes is from random to this random
+    for (int i = 0; i < LOOP; i++) {
+      int r = random.nextInt(testBuffer.length - 2) + 1;
+      testRandomReadFile(tempFile, r, r);
+    }
+  }
+
+  private void testRandomReadMemory(byte[] bytes, long start, long end) throws 
IOException {
+    PartialInputStreamImpl input = 
PartialInputStreamImpl.newInputStream(bytes, start, end);
+    testRandomReadOneBytePerTime(input, start, end);
+    input.close();
+
+    input = PartialInputStreamImpl.newInputStream(bytes, start, end);
+    testRandomReadMultiBytesPerTime(input, start, end);
+    input.close();
+  }
+
+  private void testRandomReadFile(File file, long start, long end) throws 
IOException {
+    PartialInputStreamImpl input = PartialInputStreamImpl.newInputStream(file, 
start, end);
+    testRandomReadOneBytePerTime(input, start, end);
+    input.close();
+
+    input = PartialInputStreamImpl.newInputStream(file, start, end);
+    testRandomReadMultiBytesPerTime(input, start, end);
+    input.close();
+  }
+
+  private void testRandomReadOneBytePerTime(PartialInputStreamImpl input, long 
start, long end)
+      throws IOException {
+    // test read one byte per time
+    long index = start;
+    while (input.available() > 0) {
+      int b = input.read();
+      assertEquals(index & 0x7F, b);
+      index++;
+    }
+    if (start == end) {
+      assertEquals(0, input.available());
+    }
+    assertEquals(end, index);
+    if (end == BYTES_LEN) {
+      assertEquals(-1, input.read());
+    }
+  }
+
+  void testRandomReadMultiBytesPerTime(PartialInputStreamImpl input, long 
start, long end)
+      throws IOException {
+    // test read multi bytes per times
+    long index = start;
+    Random random = new Random();
+    while (input.available() > 0) {
+      int wanna = Math.min(random.nextInt(100), input.available());
+      byte[] buffer = new byte[wanna];
+      int real = input.read(buffer, 0, wanna);
+      assertNotEquals(-1, real);
+      for (int i = 0; i < real; i++) {
+        assertEquals((index + i) & 0x7F, buffer[i]);
+      }
+      index += real;
+    }
+    if (start == end) {
+      assertEquals(0, input.available());
+    }
+    assertEquals(end, index);
+    if (end == BYTES_LEN) {
+      assertEquals(-1, input.read());
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
new file mode 100644
index 000000000..b7fd15520
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerFactoryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.writable.WritableSerializer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class SerializerFactoryTest {
+
+  private static RssConf conf = new RssConf();
+
+  @Test
+  public void testGetSerializer() {
+    SerializerFactory factory = new SerializerFactory(conf);
+    // 1 Test whether it is null
+    assertNotNull(factory.getSerializer(Writable.class));
+    assertNotNull(factory.getSerializer(IntWritable.class));
+
+    // 2 Check whether the type serializer is right
+    assertInstanceOf(WritableSerializer.class, 
factory.getSerializer(Writable.class));
+    assertInstanceOf(WritableSerializer.class, 
factory.getSerializer(IntWritable.class));
+
+    // 2 Check whether the serializer is cached
+    Serializer serializer = factory.getSerializer(Writable.class);
+    assertEquals(serializer, factory.getSerializer(IntWritable.class));
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
new file mode 100644
index 000000000..b57114fb1
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.util.Comparator;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class SerializerUtils {
+
+  public static class SomeClass {
+
+    private String value;
+
+    public SomeClass() {}
+
+    public static SomeClass create(String value) {
+      SomeClass sc = new SomeClass();
+      sc.value = value;
+      return sc;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SomeClass someClass = (SomeClass) o;
+      return Objects.equal(value, someClass.value);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(value);
+    }
+
+    @Override
+    public String toString() {
+      return "SomeClass{" + "value='" + value + '\'' + '}';
+    }
+  }
+
+  public static Object genData(Class tClass, int index) {
+    if (tClass.equals(Text.class)) {
+      return new Text(String.format("key%08d", index));
+    } else if (tClass.equals(IntWritable.class)) {
+      return new IntWritable(index);
+    } else if (tClass.equals(String.class)) {
+      return String.format("key%05d", index);
+    } else if (tClass.equals(Integer.class)) {
+      return Integer.valueOf(index);
+    } else if (tClass.equals(SomeClass.class)) {
+      return SomeClass.create(String.format("key%05d", index));
+    } else if (tClass.equals(int.class)) {
+      return index;
+    }
+    return null;
+  }
+
+  public static Class<?> getClassByName(String className) throws 
ClassNotFoundException {
+    if (className.equals("int")) {
+      return int.class;
+    } else {
+      return Class.forName(className);
+    }
+  }
+
+  public static Comparator getComparator(Class tClass) {
+    if (tClass.equals(Text.class)) {
+      return new Text.Comparator();
+    } else if (tClass.equals(IntWritable.class)) {
+      return new IntWritable.Comparator();
+    } else if (tClass.equals(String.class)) {
+      return new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
+          int i1 = Integer.valueOf(o1.substring(3));
+          int i2 = Integer.valueOf(o2.substring(3));
+          return i1 - i2;
+        }
+      };
+    } else if (tClass.equals(Integer.class)) {
+      return new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o1 - o2;
+        }
+      };
+    } else if (tClass.equals(SomeClass.class)) {
+      return new Comparator<SomeClass>() {
+        @Override
+        public int compare(SomeClass o1, SomeClass o2) {
+          int i1 = Integer.valueOf(o1.value.substring(3));
+          int i2 = Integer.valueOf(o2.value.substring(3));
+          return i1 - i2;
+        }
+      };
+    } else if (tClass.equals(int.class)) {
+      return new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o1 - o2;
+        }
+      };
+    }
+    return null;
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/WritableSerializerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/WritableSerializerTest.java
new file mode 100644
index 000000000..a3f719a0e
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/WritableSerializerTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.Random;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.writable.WritableSerializer;
+
+import static org.apache.uniffle.common.serializer.SerializerUtils.genData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class WritableSerializerTest {
+
+  private static final int LOOP = 1009;
+  private static RssConf rssConf = new RssConf();
+
+  // Test 1: both write and read will use common api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file"
+      })
+  public void testSerDeKeyValues1(String classes, @TempDir File tmpDir) throws 
Exception {
+    // 1 Construct serializer
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    WritableSerializer serializer = new WritableSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    OutputStream outputStream =
+        isFileMode
+            ? new FileOutputStream(new File(tmpDir, "tmp.data"))
+            : new ByteArrayOutputStream();
+    SerializationStream serializationStream = 
instance.serializeStream(outputStream, false);
+    long[] offsets = new long[LOOP];
+    for (int i = 0; i < LOOP; i++) {
+      serializationStream.writeRecord(genData(keyClass, i), 
genData(valueClass, i));
+      offsets[i] = serializationStream.getTotalBytesWritten();
+    }
+    serializationStream.close();
+
+    // 3 Random read
+    for (int i = 0; i < LOOP; i++) {
+      long off = offsets[i];
+      PartialInputStreamImpl inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(
+                  new File(tmpDir, "tmp.data"), off, Long.MAX_VALUE)
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), off, 
Long.MAX_VALUE);
+      DeserializationStream deserializationStream =
+          instance.deserializeStream(inputStream, keyClass, valueClass, false);
+      for (int j = i + 1; j < LOOP; j++) {
+        assertTrue(deserializationStream.nextRecord());
+        assertEquals(genData(keyClass, j), 
deserializationStream.getCurrentKey());
+        assertEquals(genData(valueClass, j), 
deserializationStream.getCurrentValue());
+      }
+      deserializationStream.close();
+    }
+  }
+
+  // Test 2: write with common api, read with raw api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file"
+      })
+  public void testSerDeKeyValues2(String classes, @TempDir File tmpDir) throws 
Exception {
+    // 1 Construct serializer
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    WritableSerializer serializer = new WritableSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    OutputStream outputStream =
+        isFileMode
+            ? new FileOutputStream(new File(tmpDir, "tmp.data"))
+            : new ByteArrayOutputStream();
+    SerializationStream serializationStream = 
instance.serializeStream(outputStream, false);
+    long[] offsets = new long[LOOP];
+    for (int i = 0; i < LOOP; i++) {
+      serializationStream.writeRecord(genData(keyClass, i), 
genData(valueClass, i));
+      offsets[i] = serializationStream.getTotalBytesWritten();
+    }
+    serializationStream.close();
+
+    // 3 Random read
+    for (int i = 0; i < LOOP; i++) {
+      long off = offsets[i];
+      PartialInputStreamImpl inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(
+                  new File(tmpDir, "tmp.data"), off, Long.MAX_VALUE)
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), off, 
Long.MAX_VALUE);
+
+      DeserializationStream deserializationStream =
+          instance.deserializeStream(inputStream, keyClass, valueClass, true);
+      for (int j = i + 1; j < LOOP; j++) {
+        assertTrue(deserializationStream.nextRecord());
+        DataOutputBuffer keyBuffer = (DataOutputBuffer) 
deserializationStream.getCurrentKey();
+        DataInputBuffer keyInputBuffer = new DataInputBuffer();
+        keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength());
+        assertEquals(genData(keyClass, j), 
instance.deserialize(keyInputBuffer, keyClass));
+        DataOutputBuffer valueBuffer = (DataOutputBuffer) 
deserializationStream.getCurrentValue();
+        DataInputBuffer valueInputBuffer = new DataInputBuffer();
+        valueInputBuffer.reset(valueBuffer.getData(), 0, 
valueBuffer.getLength());
+        assertEquals(genData(valueClass, j), 
instance.deserialize(valueInputBuffer, valueClass));
+      }
+      deserializationStream.close();
+    }
+  }
+
+  // Test 3: write with raw api, read with common api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file"
+      })
+  public void testSerDeKeyValues3(String classes, @TempDir File tmpDir) throws 
Exception {
+    // 1 Construct serializer
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    WritableSerializer serializer = new WritableSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    OutputStream outputStream =
+        isFileMode
+            ? new FileOutputStream(new File(tmpDir, "tmp.data"))
+            : new ByteArrayOutputStream();
+    SerializationStream serializationStream = 
instance.serializeStream(outputStream, true);
+    long[] offsets = new long[LOOP];
+    for (int i = 0; i < LOOP; i++) {
+      DataOutputBuffer keyBuffer = new DataOutputBuffer();
+      DataOutputBuffer valueBuffer = new DataOutputBuffer();
+      instance.serialize(genData(keyClass, i), keyBuffer);
+      instance.serialize(genData(valueClass, i), valueBuffer);
+      serializationStream.writeRecord(keyBuffer, valueBuffer);
+      offsets[i] = serializationStream.getTotalBytesWritten();
+    }
+    serializationStream.close();
+
+    // 3 Random read
+    for (int i = 0; i < LOOP; i++) {
+      long off = offsets[i];
+      PartialInputStreamImpl inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(
+                  new File(tmpDir, "tmp.data"), off, Long.MAX_VALUE)
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), off, 
Long.MAX_VALUE);
+      DeserializationStream deserializationStream =
+          instance.deserializeStream(inputStream, keyClass, valueClass, false);
+      for (int j = i + 1; j < LOOP; j++) {
+        assertTrue(deserializationStream.nextRecord());
+        assertEquals(genData(keyClass, j), 
deserializationStream.getCurrentKey());
+        assertEquals(genData(valueClass, j), 
deserializationStream.getCurrentValue());
+      }
+      deserializationStream.close();
+    }
+  }
+
+  // Test 4: both write and read use raw api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file"
+      })
+  public void testSerDeKeyValues4(String classes, @TempDir File tmpDir) throws 
Exception {
+    // 1 Construct serializer
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    WritableSerializer serializer = new WritableSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    OutputStream outputStream =
+        isFileMode
+            ? new FileOutputStream(new File(tmpDir, "tmp.data"))
+            : new ByteArrayOutputStream();
+    SerializationStream serializationStream = 
instance.serializeStream(outputStream, true);
+    long[] offsets = new long[LOOP];
+    for (int i = 0; i < LOOP; i++) {
+      DataOutputBuffer keyBuffer = new DataOutputBuffer();
+      DataOutputBuffer valueBuffer = new DataOutputBuffer();
+      instance.serialize(genData(keyClass, i), keyBuffer);
+      instance.serialize(genData(valueClass, i), valueBuffer);
+      serializationStream.writeRecord(keyBuffer, valueBuffer);
+      offsets[i] = serializationStream.getTotalBytesWritten();
+    }
+    serializationStream.close();
+
+    // 3 Random read
+    for (int i = 0; i < LOOP; i++) {
+      long off = offsets[i];
+      PartialInputStreamImpl inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(
+                  new File(tmpDir, "tmp.data"), off, Long.MAX_VALUE)
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), off, 
Long.MAX_VALUE);
+
+      DeserializationStream deserializationStream =
+          instance.deserializeStream(inputStream, keyClass, valueClass, true);
+      for (int j = i + 1; j < LOOP; j++) {
+        assertTrue(deserializationStream.nextRecord());
+        DataOutputBuffer keyBuffer = (DataOutputBuffer) 
deserializationStream.getCurrentKey();
+        DataInputBuffer keyInputBuffer = new DataInputBuffer();
+        keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength());
+        assertEquals(genData(keyClass, j), 
instance.deserialize(keyInputBuffer, keyClass));
+        DataOutputBuffer valueBuffer = (DataOutputBuffer) 
deserializationStream.getCurrentValue();
+        DataInputBuffer valueInputBuffer = new DataInputBuffer();
+        valueInputBuffer.reset(valueBuffer.getData(), 0, 
valueBuffer.getLength());
+        assertEquals(genData(valueClass, j), 
instance.deserialize(valueInputBuffer, valueClass));
+      }
+      deserializationStream.close();
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {Text.class, IntWritable.class})
+  public void testSerDeObject(Class aClass) throws Exception {
+    WritableSerializer serializer = new WritableSerializer(rssConf);
+    SerializerInstance instance = serializer.newInstance();
+    int number = new Random().nextInt(99999);
+    DataOutputBuffer output = new DataOutputBuffer();
+    instance.serialize(genData(aClass, number), output);
+    DataInputBuffer input = new DataInputBuffer();
+    input.reset(output.getData(), 0, output.getData().length);
+    Object obj = instance.deserialize(input, aClass);
+    assertEquals(genData(aClass, number), obj);
+  }
+}

Reply via email to