This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new 2abe9e4d7 feat(java): support concurrent updates when serializing
collections (#2623)
2abe9e4d7 is described below
commit 2abe9e4d7cf37ed78d35054527e43688429f7fd8
Author: Shawn Yang <[email protected]>
AuthorDate: Wed Sep 17 22:40:28 2025 +0800
feat(java): support concurrent updates when serializing collections (#2623)
## Why?
<!-- Describe the purpose of this PR. -->
## What does this PR do?
<!-- Describe the details of this PR. -->
## Related issues
Fixes #2618
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fory/issues/new/choose) describing the
need to do so and update the document if necessary.
Delete section if not applicable.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
Delete section if not applicable.
-->
---
.../fory/builder/BaseObjectCodecBuilder.java | 5 +-
.../apache/fory/collection/CollectionSnapshot.java | 152 +++++++++++++++++++++
...erableOnceMapSnapshot.java => MapSnapshot.java} | 33 +++--
.../collection/CollectionLikeSerializer.java | 4 +
.../collection/CollectionSerializers.java | 18 ++-
.../collection/ConcurrentCollectionSerializer.java | 105 ++++++++++++++
.../collection/ConcurrentMapSerializer.java | 33 +++--
.../fory/serializer/collection/MapSerializers.java | 6 +-
.../fory/collection/CollectionSnapshotTest.java | 110 +++++++++++++++
...ceMapSnapshotTest.java => MapSnapshotTest.java} | 14 +-
10 files changed, 437 insertions(+), 43 deletions(-)
diff --git
a/java/fory-core/src/main/java/org/apache/fory/builder/BaseObjectCodecBuilder.java
b/java/fory-core/src/main/java/org/apache/fory/builder/BaseObjectCodecBuilder.java
index 3d2bbe0ee..118297138 100644
---
a/java/fory-core/src/main/java/org/apache/fory/builder/BaseObjectCodecBuilder.java
+++
b/java/fory-core/src/main/java/org/apache/fory/builder/BaseObjectCodecBuilder.java
@@ -969,7 +969,10 @@ public abstract class BaseObjectCodecBuilder extends
CodecBuilder {
builder.add(action);
}
walkPath.removeLast();
- return new ListExpression(onCollectionWrite, new If(gt(size, ofInt(0)),
builder));
+ return new ListExpression(
+ onCollectionWrite,
+ new If(gt(size, ofInt(0)), builder),
+ new Invoke(serializer, "onCollectionWriteFinish", collection));
}
/**
diff --git
a/java/fory-core/src/main/java/org/apache/fory/collection/CollectionSnapshot.java
b/java/fory-core/src/main/java/org/apache/fory/collection/CollectionSnapshot.java
new file mode 100644
index 000000000..4f15a909c
--- /dev/null
+++
b/java/fory-core/src/main/java/org/apache/fory/collection/CollectionSnapshot.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fory.collection;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.fory.annotation.Internal;
+
+/**
+ * A specialized collection implementation that creates a snapshot of elements
for single iteration.
+ * This class is designed to efficiently handle concurrent collection
serialization by creating a
+ * lightweight snapshot that can be iterated without holding references to the
original collection
+ * elements.
+ *
+ * <p>The implementation uses an array-based storage for elements and provides
optimized iteration
+ * through a custom iterator. It includes memory management features such as
automatic array
+ * reallocation when clearing large collections to prevent memory leaks.
+ *
+ * <p><strong>Important:</strong> The returned iterator from {@link
#iterator()} must be consumed
+ * completely before calling {@code iterator()} again. The iterator maintains
its position through a
+ * shared index, and calling {@code iterator()} again before the previous
iterator is fully consumed
+ * will result in incorrect iteration behavior.
+ *
+ * <p>This class is marked as {@code @Internal} and should not be used
directly by application code.
+ * It's specifically designed for internal serialization purposes.
+ *
+ * @param <E> the type of elements maintained by this collection
+ * @since 1.0
+ */
+@Internal
+public class CollectionSnapshot<E> extends AbstractCollection<E> {
+ /** Threshold for array reallocation during clear operation to prevent
memory leaks. */
+ private static final int CLEAR_ARRAY_SIZE_THRESHOLD = 2048;
+
+ /** Array storing the collection elements for iteration. */
+ ObjectArray<E> array;
+
+ /** Reference to the original collection (used for snapshot creation). */
+ Collection<E> collection;
+
+ /** Current number of elements in the snapshot. */
+ int size;
+
+ /** Current iteration index for the iterator. */
+ int iterIndex;
+
+ /** Cached iterator for this collection. */
+ private final CollectionIterator iterator;
+
+ /**
+ * Constructs a new empty Snapshot. Initializes the internal array with a
default capacity of 16
+ * elements and creates the iterator instance.
+ */
+ public CollectionSnapshot() {
+ array = new ObjectArray<>(16);
+ iterator = new CollectionIterator();
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ /**
+ * Returns an iterator over the elements in this collection. The iterator is
designed for
+ * single-pass iteration and maintains its position through a shared index.
+ *
+ * <p><strong>Important:</strong> The returned iterator must be consumed
completely before calling
+ * this method again. The iterator shares its position with other potential
iterators through the
+ * {@code iterIndex} field, and calling {@code iterator()} again before the
current iterator is
+ * fully consumed will result in incorrect iteration behavior.
+ *
+ * @return an iterator over the elements in this collection
+ */
+ @Override
+ public Iterator<E> iterator() {
+ iterIndex = 0;
+ return iterator;
+ }
+
+ /**
+ * Iterator implementation for the CollectionSnapshot. This iterator is
designed for single-pass
+ * iteration and maintains its position through the iterIndex field. It
provides efficient access
+ * to collection elements stored in the underlying array.
+ *
+ * <p><strong>Important:</strong> This iterator must be consumed completely
before calling {@code
+ * iterator()} again on the collection. The iterator shares its position
with other potential
+ * iterators through the {@code iterIndex} field, and calling {@code
iterator()} again before the
+ * current iterator is fully consumed will result in incorrect iteration
behavior.
+ */
+ class CollectionIterator implements Iterator<E> {
+
+ @Override
+ public boolean hasNext() {
+ return iterIndex < size;
+ }
+
+ @Override
+ public E next() {
+ return array.get(iterIndex++);
+ }
+ }
+
+ /**
+ * Creates a snapshot of the specified collection by copying all its
elements into the internal
+ * array. This method is used to create a stable view of a concurrent
collection for serialization
+ * purposes.
+ *
+ * <p>The method iterates through all elements in the provided collection
and adds them to the
+ * internal array. The iteration index is reset to 0 to prepare for
subsequent iteration.
+ *
+ * @param collection the collection to create a snapshot of
+ */
+ public void setCollection(Collection<E> collection) {
+ ObjectArray<E> array = this.array;
+ int size = 0;
+ for (E element : collection) {
+ array.add(element);
+ size++;
+ }
+ this.size = size;
+ }
+
+ @Override
+ public void clear() {
+ if (size > CLEAR_ARRAY_SIZE_THRESHOLD) {
+ array = new ObjectArray<>(16);
+ } else {
+ array.clear();
+ }
+ iterIndex = 0;
+ size = 0;
+ }
+}
diff --git
a/java/fory-core/src/main/java/org/apache/fory/collection/IterableOnceMapSnapshot.java
b/java/fory-core/src/main/java/org/apache/fory/collection/MapSnapshot.java
similarity index 71%
rename from
java/fory-core/src/main/java/org/apache/fory/collection/IterableOnceMapSnapshot.java
rename to
java/fory-core/src/main/java/org/apache/fory/collection/MapSnapshot.java
index b903f200d..380569ad4 100644
---
a/java/fory-core/src/main/java/org/apache/fory/collection/IterableOnceMapSnapshot.java
+++ b/java/fory-core/src/main/java/org/apache/fory/collection/MapSnapshot.java
@@ -29,12 +29,17 @@ import org.apache.fory.annotation.Internal;
/**
* A specialized map implementation that creates a snapshot of entries for
single iteration. This
* class is designed to efficiently handle concurrent map serialization by
creating a lightweight
- * snapshot that can be iterated once without holding references to the
original map entries.
+ * snapshot that can be iterated without holding references to the original
map entries.
*
* <p>The implementation uses an array-based storage for entries and provides
optimized iteration
* through a custom iterator. It includes memory management features such as
automatic array
* reallocation when clearing large collections to prevent memory leaks.
*
+ * <p><strong>Important:</strong> The returned iterator from {@link
#entrySet()}{@code .iterator()}
+ * must be consumed completely before calling {@code iterator()} again. The
iterator maintains its
+ * position through a shared index, and calling {@code iterator()} again
before the previous
+ * iterator is fully consumed will result in incorrect iteration behavior.
+ *
* <p>This class is marked as {@code @Internal} and should not be used
directly by application code.
* It's specifically designed for internal serialization purposes.
*
@@ -43,7 +48,7 @@ import org.apache.fory.annotation.Internal;
* @since 1.0
*/
@Internal
-public class IterableOnceMapSnapshot<K, V> extends AbstractMap<K, V> {
+public class MapSnapshot<K, V> extends AbstractMap<K, V> {
/** Threshold for array reallocation during clear operation to prevent
memory leaks. */
private static final int CLEAR_ARRAY_SIZE_THRESHOLD = 2048;
@@ -66,10 +71,10 @@ public class IterableOnceMapSnapshot<K, V> extends
AbstractMap<K, V> {
private final EntryIterator iterator;
/**
- * Constructs a new empty IterableOnceMapSnapshot. Initializes the internal
array with a default
- * capacity of 16 entries and creates the entry set and iterator instances.
+ * Constructs a new empty MapSnapshot. Initializes the internal array with a
default capacity of
+ * 16 entries and creates the entry set and iterator instances.
*/
- public IterableOnceMapSnapshot() {
+ public MapSnapshot() {
array = new ObjectArray<>(16);
entrySet = new EntrySet();
iterator = new EntryIterator();
@@ -86,13 +91,14 @@ public class IterableOnceMapSnapshot<K, V> extends
AbstractMap<K, V> {
}
/**
- * Entry set implementation for the IterableOnceMapSnapshot. Provides a view
of the map entries
- * and supports iteration through the custom EntryIterator.
+ * Entry set implementation for the MapSnapshot. Provides a view of the map
entries and supports
+ * iteration through the custom EntryIterator.
*/
class EntrySet extends AbstractSet<Entry<K, V>> {
@Override
public Iterator<Entry<K, V>> iterator() {
+ iterIndex = 0;
return iterator;
}
@@ -103,9 +109,14 @@ public class IterableOnceMapSnapshot<K, V> extends
AbstractMap<K, V> {
}
/**
- * Iterator implementation for the IterableOnceMapSnapshot. This iterator is
designed for
- * single-pass iteration and maintains its position through the iterIndex
field. It provides
- * efficient access to map entries stored in the underlying array.
+ * Iterator implementation for the MapSnapshot. This iterator is designed
for single-pass
+ * iteration and maintains its position through the iterIndex field. It
provides efficient access
+ * to map entries stored in the underlying array.
+ *
+ * <p><strong>Important:</strong> This iterator must be consumed completely
before calling {@code
+ * iterator()} again on the entry set. The iterator shares its position with
other potential
+ * iterators through the {@code iterIndex} field, and calling {@code
iterator()} again before the
+ * current iterator is fully consumed will result in incorrect iteration
behavior.
*/
class EntryIterator implements Iterator<Entry<K, V>> {
@@ -141,7 +152,7 @@ public class IterableOnceMapSnapshot<K, V> extends
AbstractMap<K, V> {
@Override
public void clear() {
- if (size > 2048) {
+ if (size > CLEAR_ARRAY_SIZE_THRESHOLD) {
array = new ObjectArray<>(16);
} else {
array.clear();
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionLikeSerializer.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionLikeSerializer.java
index 4f5397155..1d845b652 100644
---
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionLikeSerializer.java
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionLikeSerializer.java
@@ -116,10 +116,13 @@ public abstract class CollectionLikeSerializer<T> extends
Serializer<T> {
* <li>write collection size
* <li>onCollectionWrite
* <li>write elements
+ * <li>onCollectionWriteFinish
* </ol>
*/
public abstract Collection onCollectionWrite(MemoryBuffer buffer, T value);
+ public void onCollectionWriteFinish(Collection map) {}
+
/**
* Write elements data header. Keep this consistent with
* `BaseObjectCodecBuilder#writeElementsHeader`.
@@ -313,6 +316,7 @@ public abstract class CollectionLikeSerializer<T> extends
Serializer<T> {
if (len != 0) {
writeElements(fory, buffer, collection);
}
+ onCollectionWriteFinish(collection);
}
protected final void writeElements(Fory fory, MemoryBuffer buffer,
Collection value) {
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
index 046109055..8265b2084 100644
---
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.fory.Fory;
+import org.apache.fory.collection.CollectionSnapshot;
import org.apache.fory.exception.ForyException;
import org.apache.fory.memory.MemoryBuffer;
import org.apache.fory.memory.Platform;
@@ -271,10 +272,10 @@ public class CollectionSerializers {
}
public static class CopyOnWriteArrayListSerializer
- extends CollectionSerializer<CopyOnWriteArrayList> {
+ extends ConcurrentCollectionSerializer<CopyOnWriteArrayList> {
public CopyOnWriteArrayListSerializer(Fory fory,
Class<CopyOnWriteArrayList> type) {
- super(fory, type);
+ super(fory, type, true);
}
@Override
@@ -407,10 +408,19 @@ public class CollectionSerializers {
}
public static final class ConcurrentSkipListSetSerializer
- extends SortedSetSerializer<ConcurrentSkipListSet> {
+ extends ConcurrentCollectionSerializer<ConcurrentSkipListSet> {
public ConcurrentSkipListSetSerializer(Fory fory,
Class<ConcurrentSkipListSet> cls) {
- super(fory, cls);
+ super(fory, cls, true);
+ }
+
+ @Override
+ public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer,
ConcurrentSkipListSet value) {
+ CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
+ if (!fory.isCrossLanguage()) {
+ fory.writeRef(buffer, value.comparator());
+ }
+ return snapshot;
}
@Override
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentCollectionSerializer.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentCollectionSerializer.java
new file mode 100644
index 000000000..be273aaac
--- /dev/null
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentCollectionSerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fory.serializer.collection;
+
+import java.util.Collection;
+import org.apache.fory.Fory;
+import org.apache.fory.collection.CollectionSnapshot;
+import org.apache.fory.collection.ObjectArray;
+import org.apache.fory.memory.MemoryBuffer;
+
+/**
+ * Serializer for concurrent collection implementations that require
thread-safe serialization.
+ *
+ * <p>This serializer extends {@link CollectionSerializer} to provide
specialized handling for
+ * concurrent collections such as {@link
java.util.concurrent.ConcurrentLinkedQueue} and other
+ * thread-safe collection implementations. The key feature is the use of
{@link CollectionSnapshot}
+ * to create stable snapshots of concurrent collections during serialization,
avoiding potential
+ * {@link java.util.ConcurrentModificationException} and ensuring thread
safety.
+ *
+ * <p>The serializer maintains a pool of reusable {@link CollectionSnapshot}
instances to minimize
+ * object allocation overhead during serialization.
+ *
+ * <p>This implementation is particularly important for concurrent collections
because:
+ *
+ * <ul>
+ * <li>Concurrent collections can be modified during iteration, causing
exceptions
+ * <li>Creating snapshots ensures consistent serialization state
+ * <li>Object pooling reduces garbage collection pressure
+ * </ul>
+ *
+ * @param <T> the type of concurrent collection being serialized
+ * @since 1.0
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ConcurrentCollectionSerializer<T extends Collection> extends
CollectionSerializer<T> {
+ /** Pool of reusable CollectionSnapshot instances for efficient
serialization. */
+ protected final ObjectArray<CollectionSnapshot> snapshots = new
ObjectArray<>(1);
+
+ /**
+ * Constructs a new ConcurrentCollectionSerializer for the specified
concurrent collection type.
+ *
+ * @param fory the Fory instance for serialization context
+ * @param type the class type of the concurrent collection to serialize
+ * @param supportCodegen whether code generation is supported for this
serializer
+ */
+ public ConcurrentCollectionSerializer(Fory fory, Class<T> type, boolean
supportCodegen) {
+ super(fory, type, supportCodegen);
+ }
+
+ /**
+ * Creates a snapshot of the concurrent collection for safe serialization.
+ *
+ * <p>This method retrieves a reusable {@link CollectionSnapshot} from the
pool, or creates a new
+ * one if none are available. It then creates a snapshot of the concurrent
collection to avoid
+ * concurrent modification issues during serialization. The collection size
is written to the
+ * buffer before returning the snapshot.
+ *
+ * @param buffer the memory buffer to write serialization data to
+ * @param value the concurrent collection to serialize
+ * @return a snapshot of the collection for safe iteration during
serialization
+ */
+ @Override
+ public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer, T value) {
+ CollectionSnapshot snapshot = snapshots.popOrNull();
+ if (snapshot == null) {
+ snapshot = new CollectionSnapshot();
+ }
+ snapshot.setCollection(value);
+ buffer.writeVarUint32Small7(snapshot.size());
+ return snapshot;
+ }
+
+ /**
+ * Cleans up the snapshot after serialization and returns it to the pool for
reuse.
+ *
+ * <p>This method is called after the collection serialization is complete.
It clears the snapshot
+ * to remove all references to the serialized data and returns the snapshot
instance to the pool
+ * for future reuse, improving memory efficiency.
+ *
+ * @param collection the snapshot that was used for serialization
+ */
+ @Override
+ public void onCollectionWriteFinish(Collection collection) {
+ CollectionSnapshot snapshot = (CollectionSnapshot) collection;
+ snapshot.clear();
+ snapshots.add(snapshot);
+ }
+}
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentMapSerializer.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentMapSerializer.java
index 751b77ab2..03410d32e 100644
---
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentMapSerializer.java
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/ConcurrentMapSerializer.java
@@ -20,9 +20,8 @@
package org.apache.fory.serializer.collection;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
import org.apache.fory.Fory;
-import org.apache.fory.collection.IterableOnceMapSnapshot;
+import org.apache.fory.collection.MapSnapshot;
import org.apache.fory.collection.ObjectArray;
import org.apache.fory.memory.MemoryBuffer;
@@ -31,11 +30,11 @@ import org.apache.fory.memory.MemoryBuffer;
*
* <p>This serializer extends {@link MapSerializer} to provide specialized
handling for concurrent
* maps such as {@link java.util.concurrent.ConcurrentHashMap}. The key
feature is the use of {@link
- * IterableOnceMapSnapshot} to create stable snapshots of concurrent maps
during serialization,
- * avoiding potential {@link java.util.ConcurrentModificationException} and
ensuring thread safety.
+ * MapSnapshot} to create stable snapshots of concurrent maps during
serialization, avoiding
+ * potential {@link java.util.ConcurrentModificationException} and ensuring
thread safety.
*
- * <p>The serializer maintains a pool of reusable {@link
IterableOnceMapSnapshot} instances to
- * minimize object allocation overhead during serialization.
+ * <p>The serializer maintains a pool of reusable {@link MapSnapshot}
instances to minimize object
+ * allocation overhead during serialization.
*
* <p>This implementation is particularly important for concurrent maps
because:
*
@@ -49,9 +48,9 @@ import org.apache.fory.memory.MemoryBuffer;
* @since 1.0
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class ConcurrentMapSerializer<T extends ConcurrentMap> extends
MapSerializer<T> {
- /** Pool of reusable IterableOnceMapSnapshot instances for efficient
serialization. */
- protected final ObjectArray<IterableOnceMapSnapshot> snapshots = new
ObjectArray<>(1);
+public class ConcurrentMapSerializer<T extends Map> extends MapSerializer<T> {
+ /** Pool of reusable MapSnapshot instances for efficient serialization. */
+ protected final ObjectArray<MapSnapshot> snapshots = new ObjectArray<>(1);
/**
* Constructs a new ConcurrentMapSerializer for the specified concurrent map
type.
@@ -67,20 +66,20 @@ public class ConcurrentMapSerializer<T extends
ConcurrentMap> extends MapSeriali
/**
* Creates a snapshot of the concurrent map for safe serialization.
*
- * <p>This method retrieves a reusable {@link IterableOnceMapSnapshot} from
the pool, or creates a
- * new one if none are available. It then creates a snapshot of the
concurrent map to avoid
- * concurrent modification issues during serialization. The map size is
written to the buffer
- * before returning the snapshot.
+ * <p>This method retrieves a reusable {@link MapSnapshot} from the pool, or
creates a new one if
+ * none are available. It then creates a snapshot of the concurrent map to
avoid concurrent
+ * modification issues during serialization. The map size is written to the
buffer before
+ * returning the snapshot.
*
* @param buffer the memory buffer to write serialization data to
* @param value the concurrent map to serialize
* @return a snapshot of the map for safe iteration during serialization
*/
@Override
- public IterableOnceMapSnapshot onMapWrite(MemoryBuffer buffer, T value) {
- IterableOnceMapSnapshot snapshot = snapshots.popOrNull();
+ public MapSnapshot onMapWrite(MemoryBuffer buffer, T value) {
+ MapSnapshot snapshot = snapshots.popOrNull();
if (snapshot == null) {
- snapshot = new IterableOnceMapSnapshot();
+ snapshot = new MapSnapshot();
}
snapshot.setMap(value);
buffer.writeVarUint32Small7(snapshot.size());
@@ -98,7 +97,7 @@ public class ConcurrentMapSerializer<T extends ConcurrentMap>
extends MapSeriali
*/
@Override
public void onMapWriteFinish(Map map) {
- IterableOnceMapSnapshot snapshot = (IterableOnceMapSnapshot) map;
+ MapSnapshot snapshot = (MapSnapshot) map;
snapshot.clear();
snapshots.add(snapshot);
}
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/MapSerializers.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/MapSerializers.java
index 6bbd75908..fe4eb1b9c 100644
---
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/MapSerializers.java
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/MapSerializers.java
@@ -32,8 +32,8 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.fory.Fory;
-import org.apache.fory.collection.IterableOnceMapSnapshot;
import org.apache.fory.collection.LazyMap;
+import org.apache.fory.collection.MapSnapshot;
import org.apache.fory.memory.MemoryBuffer;
import org.apache.fory.memory.Platform;
import org.apache.fory.reflect.ReflectionUtils;
@@ -273,8 +273,8 @@ public class MapSerializers {
}
@Override
- public IterableOnceMapSnapshot onMapWrite(MemoryBuffer buffer,
ConcurrentSkipListMap value) {
- IterableOnceMapSnapshot snapshot = super.onMapWrite(buffer, value);
+ public MapSnapshot onMapWrite(MemoryBuffer buffer, ConcurrentSkipListMap
value) {
+ MapSnapshot snapshot = super.onMapWrite(buffer, value);
if (!fory.isCrossLanguage()) {
fory.writeRef(buffer, value.comparator());
}
diff --git
a/java/fory-core/src/test/java/org/apache/fory/collection/CollectionSnapshotTest.java
b/java/fory-core/src/test/java/org/apache/fory/collection/CollectionSnapshotTest.java
new file mode 100644
index 000000000..e0e05bd6b
--- /dev/null
+++
b/java/fory-core/src/test/java/org/apache/fory/collection/CollectionSnapshotTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fory.collection;
+
+import static org.testng.Assert.*;
+
+import java.util.*;
+import org.testng.annotations.Test;
+
+public class CollectionSnapshotTest {
+
+ @Test
+ public void testSetCollection() {
+ CollectionSnapshot<String> snapshot = new CollectionSnapshot<>();
+ List<String> source = Arrays.asList("a", "b", "c");
+
+ snapshot.setCollection(source);
+
+ assertEquals(snapshot.size(), 3);
+ List<String> result = new ArrayList<>();
+ for (String item : snapshot) {
+ result.add(item);
+ }
+ assertEquals(result, source);
+ }
+
+ @Test
+ public void testIterator() {
+ CollectionSnapshot<Integer> snapshot = new CollectionSnapshot<>();
+ List<Integer> source = Arrays.asList(1, 2, 3);
+ snapshot.setCollection(source);
+
+ Iterator<Integer> iterator = snapshot.iterator();
+ assertTrue(iterator.hasNext());
+ assertEquals(iterator.next(), Integer.valueOf(1));
+ assertTrue(iterator.hasNext());
+ assertEquals(iterator.next(), Integer.valueOf(2));
+ assertTrue(iterator.hasNext());
+ assertEquals(iterator.next(), Integer.valueOf(3));
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testClearSmallCollection() {
+ CollectionSnapshot<String> snapshot = new CollectionSnapshot<>();
+ snapshot.setCollection(Arrays.asList("a", "b"));
+
+ snapshot.clear();
+
+ assertEquals(snapshot.size(), 0);
+ assertFalse(snapshot.iterator().hasNext());
+ }
+
+ @Test
+ public void testClearLargeCollection() {
+ CollectionSnapshot<Integer> snapshot = new CollectionSnapshot<>();
+ List<Integer> largeList = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ largeList.add(i);
+ }
+ snapshot.setCollection(largeList);
+
+ snapshot.clear();
+
+ assertEquals(snapshot.size(), 0);
+ assertFalse(snapshot.iterator().hasNext());
+ }
+
+ @Test
+ public void testClearAndReuse() {
+ CollectionSnapshot<Integer> snapshot = new CollectionSnapshot<>();
+
+ // First use
+ snapshot.setCollection(Arrays.asList(1, 2, 3));
+ assertEquals(snapshot.size(), 3);
+
+ // Clear and reuse multiple times
+ for (int i = 0; i < 3; i++) {
+ snapshot.clear();
+ assertEquals(snapshot.size(), 0);
+
+ List<Integer> newData = Arrays.asList(i * 10, i * 10 + 1);
+ snapshot.setCollection(newData);
+ assertEquals(snapshot.size(), 2);
+
+ List<Integer> result = new ArrayList<>();
+ for (Integer item : snapshot) {
+ result.add(item);
+ }
+ assertEquals(result, newData);
+ }
+ }
+}
diff --git
a/java/fory-core/src/test/java/org/apache/fory/collection/IterableOnceMapSnapshotTest.java
b/java/fory-core/src/test/java/org/apache/fory/collection/MapSnapshotTest.java
similarity index 88%
rename from
java/fory-core/src/test/java/org/apache/fory/collection/IterableOnceMapSnapshotTest.java
rename to
java/fory-core/src/test/java/org/apache/fory/collection/MapSnapshotTest.java
index 4ccff48b5..bc971e23c 100644
---
a/java/fory-core/src/test/java/org/apache/fory/collection/IterableOnceMapSnapshotTest.java
+++
b/java/fory-core/src/test/java/org/apache/fory/collection/MapSnapshotTest.java
@@ -30,11 +30,11 @@ import java.util.Map;
import java.util.Set;
import org.testng.annotations.Test;
-public class IterableOnceMapSnapshotTest {
+public class MapSnapshotTest {
@Test
public void testSetMap() {
- IterableOnceMapSnapshot<String, Integer> snapshot = new
IterableOnceMapSnapshot<>();
+ MapSnapshot<String, Integer> snapshot = new MapSnapshot<>();
Map<String, Integer> originalMap = new HashMap<>();
originalMap.put("key1", 1);
originalMap.put("key2", 2);
@@ -59,7 +59,7 @@ public class IterableOnceMapSnapshotTest {
@Test
public void testIterator() {
- IterableOnceMapSnapshot<String, Integer> snapshot = new
IterableOnceMapSnapshot<>();
+ MapSnapshot<String, Integer> snapshot = new MapSnapshot<>();
Map<String, Integer> originalMap = new HashMap<>();
originalMap.put("a", 1);
originalMap.put("b", 2);
@@ -85,7 +85,7 @@ public class IterableOnceMapSnapshotTest {
@Test
public void testClear() {
- IterableOnceMapSnapshot<String, Integer> snapshot = new
IterableOnceMapSnapshot<>();
+ MapSnapshot<String, Integer> snapshot = new MapSnapshot<>();
Map<String, Integer> originalMap = new HashMap<>();
originalMap.put("key1", 1);
originalMap.put("key2", 2);
@@ -102,7 +102,7 @@ public class IterableOnceMapSnapshotTest {
@Test
public void testReuseAfterClear() {
- IterableOnceMapSnapshot<String, Integer> snapshot = new
IterableOnceMapSnapshot<>();
+ MapSnapshot<String, Integer> snapshot = new MapSnapshot<>();
// First use
Map<String, Integer> map1 = new HashMap<>();
@@ -132,7 +132,7 @@ public class IterableOnceMapSnapshotTest {
@Test
public void testEmptyMapSet() {
- IterableOnceMapSnapshot<String, Integer> snapshot = new
IterableOnceMapSnapshot<>();
+ MapSnapshot<String, Integer> snapshot = new MapSnapshot<>();
Map<String, Integer> emptyMap = new HashMap<>();
snapshot.setMap(emptyMap);
@@ -144,7 +144,7 @@ public class IterableOnceMapSnapshotTest {
@Test
public void testEntrySetSize() {
- IterableOnceMapSnapshot<String, Integer> snapshot = new
IterableOnceMapSnapshot<>();
+ MapSnapshot<String, Integer> snapshot = new MapSnapshot<>();
Map<String, Integer> originalMap = new HashMap<>();
originalMap.put("key1", 1);
originalMap.put("key2", 2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]