This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fury.git
The following commit(s) were added to refs/heads/main by this push:
new 14bad422 feat(java): support thread safe register callback for scala
kotlin (#1895)
14bad422 is described below
commit 14bad4225a45182895df711c721cd1c52db72f32
Author: Shawn Yang <[email protected]>
AuthorDate: Tue Oct 22 16:22:32 2024 +0800
feat(java): support thread safe register callback for scala kotlin (#1895)
## What does this PR do?
support thread safe register callback for scala kotlin
<!-- Describe the purpose of this PR. -->
## Related issues
Closes #1894
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
---
.../org/apache/fury/AbstractThreadSafeFury.java | 22 ++++++++++++----------
.../main/java/org/apache/fury/ThreadLocalFury.java | 4 +++-
.../java/org/apache/fury/pool/ThreadPoolFury.java | 4 +++-
kotlin/pom.xml | 4 +++-
.../fury/serializer/kotlin/KotlinSerializers.java | 8 ++++++++
.../fury/serializer/scala/ScalaSerializers.java | 7 +++++++
6 files changed, 36 insertions(+), 13 deletions(-)
diff --git
a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java
b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java
index 656f3b9d..23a6aa40 100644
--- a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java
@@ -21,6 +21,7 @@ package org.apache.fury;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.fury.annotation.Internal;
import org.apache.fury.resolver.ClassChecker;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;
@@ -28,48 +29,49 @@ import org.apache.fury.serializer.SerializerFactory;
public abstract class AbstractThreadSafeFury implements ThreadSafeFury {
@Override
public void register(Class<?> clz) {
- processCallback(fury -> fury.register(clz));
+ registerCallback(fury -> fury.register(clz));
}
@Override
public void register(Class<?> cls, boolean createSerializer) {
- processCallback(fury -> fury.register(cls, createSerializer));
+ registerCallback(fury -> fury.register(cls, createSerializer));
}
@Override
public void register(Class<?> cls, Short id) {
- processCallback(fury -> fury.register(cls, id));
+ registerCallback(fury -> fury.register(cls, id));
}
@Override
public void register(Class<?> cls, Short id, boolean createSerializer) {
- processCallback(fury -> fury.register(cls, id, createSerializer));
+ registerCallback(fury -> fury.register(cls, id, createSerializer));
}
@Override
public <T> void registerSerializer(Class<T> type, Class<? extends
Serializer> serializerClass) {
- processCallback(fury -> fury.registerSerializer(type, serializerClass));
+ registerCallback(fury -> fury.registerSerializer(type, serializerClass));
}
@Override
public void registerSerializer(Class<?> type, Serializer<?> serializer) {
- processCallback(fury -> fury.registerSerializer(type, serializer));
+ registerCallback(fury -> fury.registerSerializer(type, serializer));
}
@Override
public void registerSerializer(Class<?> type, Function<Fury, Serializer<?>>
serializerCreator) {
- processCallback(fury -> fury.registerSerializer(type,
serializerCreator.apply(fury)));
+ registerCallback(fury -> fury.registerSerializer(type,
serializerCreator.apply(fury)));
}
@Override
public void setSerializerFactory(SerializerFactory serializerFactory) {
- processCallback(fury -> fury.setSerializerFactory(serializerFactory));
+ registerCallback(fury -> fury.setSerializerFactory(serializerFactory));
}
@Override
public void setClassChecker(ClassChecker classChecker) {
- processCallback(fury ->
fury.getClassResolver().setClassChecker(classChecker));
+ registerCallback(fury ->
fury.getClassResolver().setClassChecker(classChecker));
}
- protected abstract void processCallback(Consumer<Fury> callback);
+ @Internal
+ public abstract void registerCallback(Consumer<Fury> callback);
}
diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
index e45453bb..3e2abb86 100644
--- a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
@@ -27,6 +27,7 @@ import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.fury.annotation.Internal;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.memory.MemoryBuffer;
@@ -68,8 +69,9 @@ public class ThreadLocalFury extends AbstractThreadSafeFury {
Fury fury = bindingThreadLocal.get().get();
}
+ @Internal
@Override
- protected void processCallback(Consumer<Fury> callback) {
+ public void registerCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (LoaderBinding binding : allFury.keySet()) {
binding.visitAllFury(callback);
diff --git
a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
index 8afcdbd1..ff701e0c 100644
--- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
+import org.apache.fury.annotation.Internal;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.logging.Logger;
@@ -61,8 +62,9 @@ public class ThreadPoolFury extends AbstractThreadSafeFury {
fury -> factoryCallback.accept(fury));
}
+ @Internal
@Override
- protected void processCallback(Consumer<Fury> callback) {
+ public void registerCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
diff --git a/kotlin/pom.xml b/kotlin/pom.xml
index 24bccb95..f31c8dcc 100644
--- a/kotlin/pom.xml
+++ b/kotlin/pom.xml
@@ -29,7 +29,9 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
</properties>
diff --git
a/kotlin/src/main/java/org/apache/fury/serializer/kotlin/KotlinSerializers.java
b/kotlin/src/main/java/org/apache/fury/serializer/kotlin/KotlinSerializers.java
index e1e0b7c0..f87f1476 100644
---
a/kotlin/src/main/java/org/apache/fury/serializer/kotlin/KotlinSerializers.java
+++
b/kotlin/src/main/java/org/apache/fury/serializer/kotlin/KotlinSerializers.java
@@ -23,7 +23,9 @@ import kotlin.UByteArray;
import kotlin.UIntArray;
import kotlin.ULongArray;
import kotlin.UShortArray;
+import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
+import org.apache.fury.ThreadSafeFury;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.serializer.collection.CollectionSerializers;
import org.apache.fury.serializer.collection.MapSerializers;
@@ -34,6 +36,12 @@ import org.apache.fury.serializer.collection.MapSerializers;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class KotlinSerializers {
+
+ public static void registerSerializers(ThreadSafeFury fury) {
+ AbstractThreadSafeFury threadSafeFury = (AbstractThreadSafeFury) fury;
+ threadSafeFury.registerCallback(KotlinSerializers::registerSerializers);
+ }
+
public static void registerSerializers(Fury fury) {
ClassResolver resolver = fury.getClassResolver();
diff --git
a/scala/src/main/java/org/apache/fury/serializer/scala/ScalaSerializers.java
b/scala/src/main/java/org/apache/fury/serializer/scala/ScalaSerializers.java
index 5cfdcd89..41d56b5d 100644
--- a/scala/src/main/java/org/apache/fury/serializer/scala/ScalaSerializers.java
+++ b/scala/src/main/java/org/apache/fury/serializer/scala/ScalaSerializers.java
@@ -19,7 +19,9 @@
package org.apache.fury.serializer.scala;
+import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
+import org.apache.fury.ThreadSafeFury;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;
@@ -31,6 +33,11 @@ import static
org.apache.fury.serializer.scala.ToFactorySerializers.MapToFactory
public class ScalaSerializers {
+ public static void registerSerializers(ThreadSafeFury fury) {
+ AbstractThreadSafeFury threadSafeFury = (AbstractThreadSafeFury) fury;
+ threadSafeFury.registerCallback(ScalaSerializers::registerSerializers);
+ }
+
public static void registerSerializers(Fury fury) {
ClassResolver resolver = setSerializerFactory(fury);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]