This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-fury.git
The following commit(s) were added to refs/heads/main by this push:
new f7446403 fix(java): ThreadPoolFury#factoryCallback don't work when
create new classLoaderFuryPooled (#1628)
f7446403 is described below
commit f74464030313ca61e81d324024f4b6b55c143f4c
Author: Shuchang Li <[email protected]>
AuthorDate: Tue May 14 00:01:53 2024 +0800
fix(java): ThreadPoolFury#factoryCallback don't work when create new
classLoaderFuryPooled (#1628)
## What does this PR do?
<!-- Describe the purpose of this PR. -->
ThreadPoolFury#factoryCallback don't work when create new
classLoaderFuryPooled.
## Related issues
<!--
Is there any related issue? Please attach here.
- #xxxx0
- #xxxx1
- #xxxx2
-->
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/incubator-fury/issues/new/choose)
describing the need to do so and update the document if necessary.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
---
.../apache/fury/pool/FuryPooledObjectFactory.java | 9 ++++-
.../java/org/apache/fury/pool/ThreadPoolFury.java | 9 ++++-
.../java/org/apache/fury/ThreadSafeFuryTest.java | 46 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 3 deletions(-)
diff --git
a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
index 9efc584a..1bc03403 100644
---
a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
+++
b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
@@ -22,6 +22,7 @@ package org.apache.fury.pool;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.fury.Fury;
import org.apache.fury.logging.Logger;
@@ -62,15 +63,20 @@ public class FuryPooledObjectFactory {
*/
private final int maxPoolSize;
+ /** factoryCallback will be set in every new classLoaderFuryPooled so that
can deal every fury. */
+ private final Consumer<Fury> factoryCallback;
+
public FuryPooledObjectFactory(
Function<ClassLoader, Fury> furyFactory,
int minPoolSize,
int maxPoolSize,
long expireTime,
- TimeUnit timeUnit) {
+ TimeUnit timeUnit,
+ Consumer<Fury> factoryCallback) {
this.minPoolSize = minPoolSize;
this.maxPoolSize = maxPoolSize;
this.furyFactory = furyFactory;
+ this.factoryCallback = factoryCallback;
classLoaderFuryPooledCache =
CacheBuilder.newBuilder()
.weakKeys()
@@ -138,6 +144,7 @@ public class FuryPooledObjectFactory {
if (classLoaderFuryPooled == null) {
classLoaderFuryPooled =
new ClassLoaderFuryPooled(classLoader, furyFactory, minPoolSize,
maxPoolSize);
+ classLoaderFuryPooled.setFactoryCallback(factoryCallback);
classLoaderFuryPooledCache.put(classLoader, classLoaderFuryPooled);
}
return classLoaderFuryPooled;
diff --git
a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
index e9367dcf..f1c4bd6b 100644
--- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
@@ -47,7 +47,13 @@ public class ThreadPoolFury extends AbstractThreadSafeFury {
long expireTime,
TimeUnit timeUnit) {
this.furyPooledObjectFactory =
- new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize,
expireTime, timeUnit);
+ new FuryPooledObjectFactory(
+ furyFactory,
+ minPoolSize,
+ maxPoolSize,
+ expireTime,
+ timeUnit,
+ fury -> factoryCallback.accept(fury));
}
@Override
@@ -56,7 +62,6 @@ public class ThreadPoolFury extends AbstractThreadSafeFury {
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
furyPooled.allFury.keySet().forEach(callback);
- furyPooled.setFactoryCallback(factoryCallback);
}
}
diff --git
a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
index 464ea6f5..29458572 100644
--- a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
+++ b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
@@ -30,12 +30,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Data;
import org.apache.fury.config.Language;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.resolver.MetaContext;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.test.bean.BeanA;
import org.apache.fury.test.bean.BeanB;
+import org.apache.fury.test.bean.Foo;
import org.apache.fury.test.bean.Struct;
import org.apache.fury.util.LoaderBinding.StagingType;
import org.testng.Assert;
@@ -319,4 +321,48 @@ public class ThreadSafeFuryTest extends FuryTestBase {
Assert.assertEquals(fury.deserializeJavaObject(buffer, String.class),
"abc");
}
}
+
+ @Data
+ static class Foo {
+ int f1;
+ }
+
+ public static class FooSerializer extends Serializer<Foo> {
+ public FooSerializer(Fury fury, Class<Foo> type) {
+ super(fury, type);
+ }
+
+ @Override
+ public void write(MemoryBuffer buffer, Foo value) {
+ buffer.writeInt32(value.f1);
+ }
+
+ @Override
+ public Foo read(MemoryBuffer buffer) {
+ final Foo foo = new Foo();
+ foo.f1 = buffer.readInt32();
+ return foo;
+ }
+ }
+
+ public static class CustomClassLoader extends ClassLoader {
+ public CustomClassLoader(ClassLoader parent) {
+ super(parent);
+ }
+ }
+
+ @Test
+ public void testSerializerRegister() {
+ final ThreadSafeFury threadSafeFury =
+
Fury.builder().requireClassRegistration(false).buildThreadSafeFuryPool(0, 2);
+ threadSafeFury.registerSerializer(Foo.class, FooSerializer.class);
+ // create a new classLoader
+ threadSafeFury.setClassLoader(new
CustomClassLoader(ClassLoader.getSystemClassLoader()));
+ threadSafeFury.execute(
+ fury -> {
+ Assert.assertEquals(
+ fury.getClassResolver().getSerializer(Foo.class).getClass(),
FooSerializer.class);
+ return null;
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]