This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-fury.git


The following commit(s) were added to refs/heads/main by this push:
     new b2fdd68f refactor(java): Remove Guava's Concurrency utils usages 
(#1614)
b2fdd68f is described below

commit b2fdd68fbfdc785f2bbad088224295a768ff9bcc
Author: Nikita Ivchenko <[email protected]>
AuthorDate: Wed May 8 19:59:47 2024 +0300

    refactor(java): Remove Guava's Concurrency utils usages (#1614)
    
    <!--
    **Thanks for contributing to Fury.**
    
    **If this is your first time opening a PR on fury, you can refer to
    
[CONTRIBUTING.md](https://github.com/apache/incubator-fury/blob/main/CONTRIBUTING.md).**
    
    Contribution Checklist
    
    - The **Apache Fury (incubating)** community has restrictions on the
    naming of pr titles. You can also find instructions in
    
[CONTRIBUTING.md](https://github.com/apache/incubator-fury/blob/main/CONTRIBUTING.md).
    
    - Fury has a strong focus on performance. If the PR you submit will have
    an impact on performance, please benchmark it first and provide the
    benchmark result here.
    -->
    
    ## What does this PR do?
    
    Removes Guava's Concurrency utils usages
    
    ## Related issues
    
    #1113
    
    
    ## Does this PR introduce any user-facing change?
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/incubator-fury/issues/new/choose)
    describing the need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    
    
    
    <!--
    When the PR has an impact on performance (if you don't know whether the
    PR will have an impact on performance, you can submit the PR first, and
    if it will have impact on performance, the code reviewer will explain
    it), be sure to attach a benchmark data here.
    -->
---
 LICENSE                                            |  1 +
 .../java/org/apache/fury/builder/JITContext.java   | 26 ++----
 .../org/apache/fury/codegen/CodeGenerator.java     | 53 ++++++------
 .../util/concurrency/DirectExecutorService.java    | 98 ++++++++++++++++++++++
 .../concurrency/FuryJitCompilerThreadFactory.java  | 36 ++++++++
 java/fury-core/src/main/resources/META-INF/LICENSE |  1 +
 licenserc.toml                                     |  1 +
 7 files changed, 172 insertions(+), 44 deletions(-)

diff --git a/LICENSE b/LICENSE
index d0549103..a5d943ce 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license.
       java/fury-core/src/main/java/org/apache/fury/reflect/TypeParameter.java
       java/fury-core/src/main/java/org/apache/fury/reflect/Types.java
       java/fury-core/src/main/java/org/apache/fury/reflect/TypeRef.java
+      
java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java
 
 * spark (https://github.com/apache/spark)
     Files:
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java 
b/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java
index b06775b6..a19efd4a 100644
--- a/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java
+++ b/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java
@@ -19,15 +19,12 @@
 
 package org.apache.fury.builder;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import org.apache.fury.Fury;
@@ -85,16 +82,13 @@ public class JITContext {
         if (fury.getConfig().isAsyncCompilationEnabled() && 
!isAsyncVisitingFury()) {
           // TODO(chaokunyang) stash callbacks and submit jit task if the 
serialization speed
           // is really needed.
-          ListeningExecutorService compilationService = 
CodeGenerator.getCompilationService();
-          ListenableFuture<T> future;
+          ExecutorService compilationService = 
CodeGenerator.getCompilationService();
           hasJITResult.put(callback.id(), new ArrayList<>());
           numRunningTask++;
-          future = compilationService.submit(jitAction);
-          Futures.addCallback(
-              future,
-              new FutureCallback<T>() {
-                @Override
-                public void onSuccess(T result) {
+          compilationService.execute(
+              () -> {
+                try {
+                  T result = jitAction.call();
                   try {
                     lock();
                     callback.onSuccess(result);
@@ -108,10 +102,7 @@ public class JITContext {
                     }
                     unlock();
                   }
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
+                } catch (Throwable t) {
                   try {
                     lock();
                     callback.onFailure(t);
@@ -124,8 +115,7 @@ public class JITContext {
                     unlock();
                   }
                 }
-              },
-              compilationService);
+              });
           return interpreterModeAction.call();
         } else {
           return jitAction.call();
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java 
b/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java
index 2d03b5d7..776df27d 100644
--- a/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java
+++ b/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java
@@ -19,10 +19,6 @@
 
 package org.apache.fury.codegen;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -30,7 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
 import java.util.WeakHashMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +45,8 @@ import org.apache.fury.util.DelayedRef;
 import org.apache.fury.util.GraalvmSupport;
 import org.apache.fury.util.Preconditions;
 import org.apache.fury.util.StringUtils;
+import org.apache.fury.util.concurrency.DirectExecutorService;
+import org.apache.fury.util.concurrency.FuryJitCompilerThreadFactory;
 
 /**
  * Code generator will take a list of {@link CompileUnit} and compile it into 
a list of classes.
@@ -80,7 +80,7 @@ public class CodeGenerator {
   private static final String FALLBACK_PACKAGE = 
Generated.class.getPackage().getName();
   public static final boolean ENABLE_FURY_GENERATED_CLASS_UNIQUE_ID;
   private static int maxPoolSize = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
-  private static ListeningExecutorService compilationExecutorService;
+  private static ExecutorService compilationExecutorService;
 
   static {
     boolean useUniqueId = StringUtils.isBlank(CodeGenerator.getCodeDir());
@@ -205,34 +205,35 @@ public class CodeGenerator {
     return resultClassLoader;
   }
 
-  public ListenableFuture<Class<?>[]> asyncCompile(CompileUnit... 
compileUnits) {
-    return getCompilationService()
-        .submit(
-            () -> {
-              ClassLoader loader = compile(compileUnits);
-              return Arrays.stream(compileUnits)
-                  .map(
-                      compileUnit -> {
-                        try {
-                          return (Class<?>) 
loader.loadClass(compileUnit.getQualifiedClassName());
-                        } catch (ClassNotFoundException e) {
-                          throw new IllegalStateException(
-                              "Impossible because we just compiled class", e);
-                        }
-                      })
-                  .toArray(Class<?>[]::new);
-            });
+  public CompletableFuture<Class<?>[]> asyncCompile(CompileUnit... 
compileUnits) {
+    ExecutorService executorService = getCompilationService();
+    return CompletableFuture.supplyAsync(
+        () -> {
+          ClassLoader loader = compile(compileUnits);
+          return Arrays.stream(compileUnits)
+              .map(
+                  compileUnit -> {
+                    try {
+                      return (Class<?>) 
loader.loadClass(compileUnit.getQualifiedClassName());
+                    } catch (ClassNotFoundException e) {
+                      throw new IllegalStateException(
+                          "Impossible because we just compiled class", e);
+                    }
+                  })
+              .toArray(Class<?>[]::new);
+        },
+        executorService);
   }
 
   public static void seMaxCompilationThreadPoolSize(int 
maxCompilationThreadPoolSize) {
     maxPoolSize = maxCompilationThreadPoolSize;
   }
 
-  public static synchronized ListeningExecutorService getCompilationService() {
+  public static synchronized ExecutorService getCompilationService() {
     if (compilationExecutorService == null) {
       if (GraalvmSupport.isGraalBuildtime()) {
         // GraalVM build time can't reachable thread.
-        return compilationExecutorService = 
MoreExecutors.newDirectExecutorService();
+        return compilationExecutorService = new DirectExecutorService();
       }
       ThreadPoolExecutor executor =
           new ThreadPoolExecutor(
@@ -241,13 +242,13 @@ public class CodeGenerator {
               5L,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(),
-              new 
ThreadFactoryBuilder().setNameFormat("fury-jit-compiler-%d").build(),
+              new FuryJitCompilerThreadFactory(),
               (r, e) -> LOG.warn("Task {} rejected from {}", r.toString(), e));
       // Normally task won't be rejected by executor, since we used an unbound 
queue.
       // But when we shut down executor for debug, it'll be rejected by 
executor,
       // in such cases we just ignore the reject exception by log it.
       executor.allowCoreThreadTimeOut(true);
-      compilationExecutorService = MoreExecutors.listeningDecorator(executor);
+      compilationExecutorService = executor;
     }
     return compilationExecutorService;
   }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java
 
b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java
new file mode 100644
index 00000000..2aa904fe
--- /dev/null
+++ 
b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.fury.util.concurrency;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+// Mostly derived from Guava 32.1.2
+// com.google.common.util.concurrent.MoreExecutors.DirectExecutorService
+// 
https://github.com/google/guava/blob/9f6a3840/guava/src/com/google/common/util/concurrent/MoreExecutors.java
+public class DirectExecutorService extends AbstractExecutorService {
+  private final Object lock = new Object();
+  private int runningTasks = 0;
+  private boolean shutdown = false;
+
+  @Override
+  public void execute(Runnable command) {
+    synchronized (lock) {
+      if (shutdown) {
+        throw new RejectedExecutionException("Executor already shutdown");
+      }
+      runningTasks++;
+    }
+    try {
+      command.run();
+    } finally {
+      synchronized (lock) {
+        int numRunning = --runningTasks;
+        if (numRunning == 0) {
+          lock.notifyAll();
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean isShutdown() {
+    synchronized (lock) {
+      return shutdown;
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    synchronized (lock) {
+      shutdown = true;
+      if (runningTasks == 0) {
+        lock.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    shutdown();
+    return Collections.emptyList();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    synchronized (lock) {
+      return shutdown && runningTasks == 0;
+    }
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    long nanos = unit.toNanos(timeout);
+    synchronized (lock) {
+      while (true) {
+        if (shutdown && runningTasks == 0) {
+          return true;
+        } else if (nanos <= 0) {
+          return false;
+        } else {
+          long now = System.nanoTime();
+          TimeUnit.NANOSECONDS.timedWait(lock, nanos);
+          nanos -= System.nanoTime() - now; // subtract the actual time we 
waited
+        }
+      }
+    }
+  }
+}
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/util/concurrency/FuryJitCompilerThreadFactory.java
 
b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/FuryJitCompilerThreadFactory.java
new file mode 100644
index 00000000..1ec883c0
--- /dev/null
+++ 
b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/FuryJitCompilerThreadFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury.util.concurrency;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FuryJitCompilerThreadFactory implements ThreadFactory {
+  private final ThreadFactory backingThreadFactory = 
Executors.defaultThreadFactory();
+  private final AtomicInteger threadNumber = new AtomicInteger(0);
+
+  @Override
+  public Thread newThread(Runnable task) {
+    Thread thread = backingThreadFactory.newThread(task);
+    thread.setName("fury-jit-compiler-" + threadNumber.incrementAndGet());
+    return thread;
+  }
+}
diff --git a/java/fury-core/src/main/resources/META-INF/LICENSE 
b/java/fury-core/src/main/resources/META-INF/LICENSE
index 83adf1aa..187a8239 100644
--- a/java/fury-core/src/main/resources/META-INF/LICENSE
+++ b/java/fury-core/src/main/resources/META-INF/LICENSE
@@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license.
       java/fury-core/src/main/java/org/apache/fury/reflect/TypeParameter.java
       java/fury-core/src/main/java/org/apache/fury/reflect/Types.java
       java/fury-core/src/main/java/org/apache/fury/reflect/TypeRef.java
+      
java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java
 
 * spark (https://github.com/apache/spark)
     Files:
diff --git a/licenserc.toml b/licenserc.toml
index 6ae262df..46cd3e58 100644
--- a/licenserc.toml
+++ b/licenserc.toml
@@ -48,6 +48,7 @@ excludes = [
     "java/fury-core/src/main/java/org/apache/fury/reflect/Types.java",
     "java/fury-core/src/main/java/org/apache/fury/type/Generics.java",
     "java/fury-core/src/main/java/org/apache/fury/util/MurmurHash3.java",
+    
"java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java",
     "java/fury-core/src/main/java/org/apache/fury/memory/Platform.java",
     "java/fury-core/src/main/java/org/apache/fury/util/Preconditions.java",
     "java/fury-core/src/test/java/org/apache/fury/type/GenericsTest.java",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to