This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fury.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e2a5284 fix(java): fix streaming classdef read (#1758)
1e2a5284 is described below

commit 1e2a52843ad2540eb0bd444d970f8cfa3cc8cee3
Author: Shawn Yang <[email protected]>
AuthorDate: Wed Jul 24 20:02:15 2024 +0800

    fix(java): fix streaming classdef read (#1758)
    
    ## What does this PR do?
    
     fix streaming classdef read
    
    ## Related issues
    
    Closes #1757
    
    ## Does this PR introduce any user-facing change?
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/fury/issues/new/choose) describing the
    need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    
    ## Benchmark
    
    <!--
    When the PR has an impact on performance (if you don't know whether the
    PR will have an impact on performance, you can submit the PR first, and
    if it will have impact on performance, the code reviewer will explain
    it), be sure to attach a benchmark data here.
    -->
---
 .../src/main/java/org/apache/fury/Fury.java        | 34 +++++++++++++++++-----
 .../org/apache/fury/resolver/ClassResolver.java    |  4 ---
 .../src/test/java/org/apache/fury/StreamTest.java  | 31 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 11 deletions(-)

diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java 
b/java/fury-core/src/main/java/org/apache/fury/Fury.java
index 87c635d7..9f3e84e1 100644
--- a/java/fury-core/src/main/java/org/apache/fury/Fury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java
@@ -102,6 +102,7 @@ public final class Fury implements BaseFury {
 
   private final Config config;
   private final boolean refTracking;
+  private final boolean shareMeta;
   private final RefResolver refResolver;
   private final ClassResolver classResolver;
   private final MetaStringResolver metaStringResolver;
@@ -125,6 +126,7 @@ public final class Fury implements BaseFury {
   private int copyDepth;
   private final boolean copyRefTracking;
   private final IdentityMap<Object, Object> originToCopyMap;
+  private int classDefEndOffset;
 
   public Fury(FuryBuilder builder, ClassLoader classLoader) {
     // Avoid set classLoader in `FuryBuilder`, which won't be clear when
@@ -133,6 +135,7 @@ public final class Fury implements BaseFury {
     this.language = config.getLanguage();
     this.refTracking = config.trackingRef();
     this.copyRefTracking = config.copyTrackingRef();
+    this.shareMeta = config.isMetaShareEnabled();
     compressInt = config.compressInt();
     longEncoding = config.longEncoding();
     if (refTracking) {
@@ -332,7 +335,6 @@ public final class Fury implements BaseFury {
 
   private void write(MemoryBuffer buffer, Object obj) {
     int startOffset = buffer.writerIndex();
-    boolean shareMeta = config.isMetaShareEnabled();
     if (shareMeta) {
       buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start 
offsets.
     }
@@ -781,8 +783,8 @@ public final class Fury implements BaseFury {
       if (isTargetXLang) {
         obj = xdeserializeInternal(buffer);
       } else {
-        if (config.isMetaShareEnabled()) {
-          classResolver.readClassDefs(buffer);
+        if (shareMeta) {
+          readClassDefs(buffer);
         }
         obj = readRef(buffer);
       }
@@ -790,6 +792,9 @@ public final class Fury implements BaseFury {
     } catch (Throwable t) {
       throw ExceptionUtils.handleReadFailed(this, t);
     } finally {
+      if (shareMeta) {
+        buffer.readerIndex(classDefEndOffset);
+      }
       resetRead();
       jitContext.unlock();
     }
@@ -1097,8 +1102,8 @@ public final class Fury implements BaseFury {
       if (depth != 0) {
         throwDepthDeserializationException();
       }
-      if (config.isMetaShareEnabled()) {
-        classResolver.readClassDefs(buffer);
+      if (shareMeta) {
+        readClassDefs(buffer);
       }
       T obj;
       int nextReadRefId = refResolver.tryPreserveRefId(buffer);
@@ -1111,6 +1116,9 @@ public final class Fury implements BaseFury {
     } catch (Throwable t) {
       throw ExceptionUtils.handleReadFailed(this, t);
     } finally {
+      if (shareMeta) {
+        buffer.readerIndex(classDefEndOffset);
+      }
       resetRead();
       jitContext.unlock();
     }
@@ -1211,13 +1219,16 @@ public final class Fury implements BaseFury {
       if (depth != 0) {
         throwDepthDeserializationException();
       }
-      if (config.isMetaShareEnabled()) {
-        classResolver.readClassDefs(buffer);
+      if (shareMeta) {
+        readClassDefs(buffer);
       }
       return readRef(buffer);
     } catch (Throwable t) {
       throw ExceptionUtils.handleReadFailed(this, t);
     } finally {
+      if (shareMeta) {
+        buffer.readerIndex(classDefEndOffset);
+      }
       resetRead();
       jitContext.unlock();
     }
@@ -1409,6 +1420,15 @@ public final class Fury implements BaseFury {
     }
   }
 
+  private void readClassDefs(MemoryBuffer buffer) {
+    int classDefOffset = buffer.readInt32();
+    int readerIndex = buffer.readerIndex();
+    buffer.readerIndex(classDefOffset);
+    classResolver.readClassDefs(buffer);
+    classDefEndOffset = buffer.readerIndex();
+    buffer.readerIndex(readerIndex);
+  }
+
   public void reset() {
     refResolver.reset();
     classResolver.reset();
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java 
b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
index 93a3701e..ce4dfa27 100644
--- a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
+++ b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java
@@ -1470,9 +1470,6 @@ public class ClassResolver {
    */
   public void readClassDefs(MemoryBuffer buffer) {
     MetaContext metaContext = fury.getSerializationContext().getMetaContext();
-    int classDefOffset = buffer.readInt32();
-    int readerIndex = buffer.readerIndex();
-    buffer.readerIndex(classDefOffset);
     int numClassDefs = buffer.readVarUint32Small14();
     for (int i = 0; i < numClassDefs; i++) {
       long id = buffer.readInt64();
@@ -1491,7 +1488,6 @@ public class ClassResolver {
       // can be created still.
       metaContext.readClassInfos.add(null);
     }
-    buffer.readerIndex(readerIndex);
   }
 
   private Tuple2<ClassDef, ClassInfo> readClassDef(MemoryBuffer buffer, long 
header) {
diff --git a/java/fury-core/src/test/java/org/apache/fury/StreamTest.java 
b/java/fury-core/src/test/java/org/apache/fury/StreamTest.java
index 918e7005..ea81e605 100644
--- a/java/fury-core/src/test/java/org/apache/fury/StreamTest.java
+++ b/java/fury-core/src/test/java/org/apache/fury/StreamTest.java
@@ -23,6 +23,7 @@ import static org.apache.fury.io.FuryStreamReader.of;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.Lists;
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -30,15 +31,20 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.apache.fury.config.CompatibleMode;
 import org.apache.fury.io.FuryInputStream;
 import org.apache.fury.io.FuryReadableChannel;
 import org.apache.fury.io.FuryStreamReader;
 import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.reflect.ReflectionUtils;
 import org.apache.fury.test.bean.BeanA;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class StreamTest {
+
   @Test
   public void testBufferStream() {
     MemoryBuffer buffer0 = MemoryBuffer.newHeapBuffer(10);
@@ -320,4 +326,29 @@ public class StreamTest {
       }
     }
   }
+
+  @Test
+  public void testScopedMetaShare() throws IOException {
+    Fury fury =
+        Fury.builder()
+            .requireClassRegistration(false)
+            .withCompatibleMode(CompatibleMode.COMPATIBLE)
+            .withScopedMetaShare(true)
+            .build();
+    ByteArrayOutputStream bas = new ByteArrayOutputStream();
+    ArrayList<Integer> list = Lists.newArrayList(1, 2, 3);
+    fury.serialize(bas, list);
+    HashMap<String, String> map = new HashMap<>();
+    map.put("key", "value");
+    fury.serialize(bas, map);
+    ArrayList<Integer> list2 = Lists.newArrayList(10, 9, 7);
+    fury.serialize(bas, list2);
+    bas.flush();
+
+    InputStream bis = new ByteArrayInputStream(bas.toByteArray());
+    FuryInputStream stream = of(bis);
+    Assert.assertEquals(fury.deserialize(stream), list);
+    Assert.assertEquals(fury.deserialize(stream), map);
+    Assert.assertEquals(fury.deserialize(stream), list2);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to