This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new f561138  [3.0] Add serializingExecutor (#8999)
f561138 is described below

commit f561138fe92cc838146b76479fdff70ab163401d
Author: earthchen <[email protected]>
AuthorDate: Sat Oct 9 03:25:32 2021 -0500

    [3.0] Add serializingExecutor (#8999)
    
    * add serializingExecutor
    
    * wrapper executor for tri stream
    
    * remove unused set
    
    * remove print out
    
    * add timeout
    
    * add timeout
---
 .../threadpool/serial/SerializingExecutor.java     | 112 +++++++++++++++++++++
 .../threadpool/serial/SerializingExecutorTest.java |  69 +++++++++++++
 .../rpc/protocol/tri/AbstractServerStream.java     |  21 ++--
 .../dubbo/rpc/protocol/tri/AbstractStream.java     |  49 +++++----
 .../dubbo/rpc/protocol/tri/ServerStream.java       |  53 +++++-----
 5 files changed, 241 insertions(+), 63 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
new file mode 100644
index 0000000..e106523
--- /dev/null
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool.serial;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Executor ensuring that all {@link Runnable} tasks submitted are executed in 
order
+ * using the provided {@link Executor}, and serially such that no two will 
ever be
+ * running at the same time.
+ */
+public final class SerializingExecutor implements Executor, Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SerializingExecutor.class);
+
+    /**
+     * Use false to stop and true to run
+     */
+    private final AtomicBoolean atomicBoolean = new AtomicBoolean();
+
+    private final Executor executor;
+
+    private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Creates a SerializingExecutor, running tasks using {@code executor}.
+     *
+     * @param executor Executor in which tasks should be run. Must not be null.
+     */
+    public SerializingExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+    /**
+     * Runs the given runnable strictly after all Runnables that were submitted
+     * before it, and using the {@code executor} passed to the constructor.    
 .
+     */
+    @Override
+    public void execute(Runnable r) {
+        runQueue.add(r);
+        schedule(r);
+    }
+
+    private void schedule(Runnable removable) {
+        if (atomicBoolean.compareAndSet(false, true)) {
+            boolean success = false;
+            try {
+                executor.execute(this);
+                success = true;
+            } finally {
+                // It is possible that at this point that there are still 
tasks in
+                // the queue, it would be nice to keep trying but the error 
may not
+                // be recoverable.  So we update our state and propagate so 
that if
+                // our caller deems it recoverable we won't be stuck.
+                if (!success) {
+                    if (removable != null) {
+                        // This case can only be reached if 'this' was not 
currently running, and we failed to
+                        // reschedule.  The item should still be in the queue 
for removal.
+                        // ConcurrentLinkedQueue claims that null elements are 
not allowed, but seems to not
+                        // throw if the item to remove is null.  If removable 
is present in the queue twice,
+                        // the wrong one may be removed.  It doesn't seem 
possible for this case to exist today.
+                        // This is important to run in case of 
RejectedExectuionException, so that future calls
+                        // to execute don't succeed and accidentally run a 
previous runnable.
+                        runQueue.remove(removable);
+                    }
+                    atomicBoolean.set(false);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        Runnable r;
+        try {
+            while ((r = runQueue.poll()) != null) {
+                try {
+                    r.run();
+                } catch (RuntimeException e) {
+                    LOGGER.error("Exception while executing runnable " + r, e);
+                }
+            }
+        } finally {
+            atomicBoolean.set(false);
+        }
+        if (!runQueue.isEmpty()) {
+            // we didn't enqueue anything but someone else did.
+            schedule(null);
+        }
+    }
+}
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutorTest.java
 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutorTest.java
new file mode 100644
index 0000000..6d12378
--- /dev/null
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool.serial;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+public class SerializingExecutorTest {
+
+    protected static SerializingExecutor serializingExecutor;
+
+    @BeforeAll
+    public static void before() {
+        ExecutorService service = Executors.newFixedThreadPool(4);
+        serializingExecutor = new SerializingExecutor(service);
+    }
+
+    @Test
+    public void test1() throws InterruptedException {
+        int n = 2;
+        int eachCount = 1000;
+        int total = n * eachCount;
+        int sleepMillis = 10;
+        Map<String, Integer> map = new HashMap<>();
+        map.put("val", 0);
+        CountDownLatch downLatch = new CountDownLatch(total);
+        for (int i = 0; i < total; i++) {
+            final int index = i;
+            Thread.sleep(ThreadLocalRandom.current().nextInt(sleepMillis));
+            serializingExecutor.execute(() -> {
+                try {
+                    
Thread.sleep(ThreadLocalRandom.current().nextInt(sleepMillis));
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                int num = map.get("val");
+                map.put("val", num + 1);
+                downLatch.countDown();
+                Assertions.assertEquals(num, index);
+            });
+        }
+        downLatch.await(3, TimeUnit.SECONDS);
+        Assertions.assertEquals(total, map.get("val"));
+    }
+}
\ No newline at end of file
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 71891b4..92127e7 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.HeaderFilter;
 import org.apache.dubbo.rpc.Invoker;
@@ -55,7 +54,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
     }
 
     protected AbstractServerStream(URL url, ProviderModel providerModel) {
-        this(url, lookupExecutor(url, providerModel), providerModel);
+        this(url, lookupExecutor(providerModel), providerModel);
     }
 
     protected AbstractServerStream(URL url, Executor executor, ProviderModel 
providerModel) {
@@ -65,20 +64,12 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
         this.headerFilters = 
url.getOrDefaultApplicationModel().getExtensionLoader(HeaderFilter.class).getActivateExtension(url,
 HEADER_FILTER_KEY);
     }
 
-    private static Executor lookupExecutor(URL url, ProviderModel 
providerModel) {
-        ExecutorService executor = null;
-        if (providerModel != null) {
-            executor = (ExecutorService) providerModel.getServiceMetadata()
-                    .getAttribute(CommonConstants.THREADPOOL_KEY);
+    private static Executor lookupExecutor(ProviderModel providerModel) {
+        if (providerModel == null) {
+            return null;
         }
-        ExecutorRepository executorRepository = 
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
-        if (executor == null) {
-            executor = executorRepository.getExecutor(url);
-        }
-        if (executor == null) {
-            executor = executorRepository.createExecutorIfAbsent(url);
-        }
-        return executor;
+        return (ExecutorService) providerModel.getServiceMetadata()
+                .getAttribute(CommonConstants.THREADPOOL_KEY);
     }
 
     public static UnaryServerStream unary(URL url) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index e1f6c74..c6383cd 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -21,7 +21,8 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.serialize.MultipleSerialization;
 import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.config.Constants;
 import org.apache.dubbo.remoting.exchange.Request;
@@ -37,32 +38,13 @@ import com.google.rpc.Status;
 import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractStream implements Stream {
     protected static final String DUPLICATED_DATA = "Duplicated data";
-    private static final List<Executor> CALLBACK_EXECUTORS = new 
ArrayList<>(4);
-
-    static {
-        ThreadFactory tripleTF = new 
NamedInternalThreadFactory("tri-callback", true);
-        for (int i = 0; i < 4; i++) {
-            final ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0, 
TimeUnit.DAYS,
-                    new LinkedBlockingQueue<>(1024),
-                    tripleTF, new ThreadPoolExecutor.AbortPolicy());
-            CALLBACK_EXECUTORS.add(tp);
-        }
-
-    }
 
     private final URL url;
     private final MultipleSerialization multipleSerialization;
@@ -78,14 +60,14 @@ public abstract class AbstractStream implements Stream {
     private TransportObserver transportSubscriber;
 
     private final CancellationContext cancellationContext;
-    private boolean cancelled = false;
+    private volatile boolean cancelled = false;
 
     public boolean isCancelled() {
         return cancelled;
     }
 
     protected AbstractStream(URL url) {
-        this(url, allocateCallbackExecutor());
+        this(url, null);
     }
 
     protected CancellationContext getCancellationContext() {
@@ -94,7 +76,8 @@ public abstract class AbstractStream implements Stream {
 
     protected AbstractStream(URL url, Executor executor) {
         this.url = url;
-        this.executor = executor;
+        final Executor sourceExecutor = lookupExecutor(url, executor);
+        this.executor = wrapperSerializingExecutor(sourceExecutor);
         final String value = 
url.getParameter(Constants.MULTI_SERIALIZATION_KEY, 
CommonConstants.DEFAULT_KEY);
         this.multipleSerialization = 
url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
                 .getExtension(value);
@@ -103,8 +86,24 @@ public abstract class AbstractStream implements Stream {
         this.streamObserver = createStreamObserver();
     }
 
-    private static Executor allocateCallbackExecutor() {
-        return CALLBACK_EXECUTORS.get(ThreadLocalRandom.current().nextInt(4));
+
+    private Executor lookupExecutor(URL url, Executor executor) {
+        // only server maybe not null
+        if (executor != null) {
+            return executor;
+        }
+        ExecutorRepository executorRepository = 
url.getOrDefaultApplicationModel()
+                .getExtensionLoader(ExecutorRepository.class)
+                .getDefaultExtension();
+        Executor urlExecutor = executorRepository.getExecutor(url);
+        if (urlExecutor == null) {
+            urlExecutor = executorRepository.createExecutorIfAbsent(url);
+        }
+        return urlExecutor;
+    }
+
+    private Executor wrapperSerializingExecutor(Executor executor) {
+        return new SerializingExecutor(executor);
     }
 
     public Request getRequest() {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index b4374f8..666fb2e 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -92,35 +92,40 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
             if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
                 return;
             }
-            try {
-                
RpcContext.restoreCancellationContext(getCancellationContext());
-                final RpcInvocation inv = buildInvocation(metadata);
-                inv.setArguments(new Object[]{asStreamObserver()});
-                final Result result = getInvoker().invoke(inv);
+            execute(() -> {
                 try {
-                    subscribe((StreamObserver<Object>) result.getValue());
-                } catch (Throwable t) {
-                    
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withDescription("Failed to create server's 
observer"));
+                    
RpcContext.restoreCancellationContext(getCancellationContext());
+                    final RpcInvocation inv = buildInvocation(metadata);
+                    inv.setArguments(new Object[]{asStreamObserver()});
+                    final Result result = getInvoker().invoke(inv);
+                    try {
+                        subscribe((StreamObserver<Object>) result.getValue());
+                    } catch (Throwable t) {
+                        
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                                .withDescription("Failed to create server's 
observer"));
+                    }
+                } finally {
+                    RpcContext.removeCancellationContext();
                 }
-            } finally {
-                RpcContext.removeCancellationContext();
-            }
+            });
+
         }
 
         @Override
         public void onData(byte[] in, boolean endStream) {
-            try {
-                if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
-                    serverStreamOnData(in);
-                    return;
+            execute(() -> {
+                try {
+                    if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
+                        serverStreamOnData(in);
+                        return;
+                    }
+                    biStreamOnData(in);
+                } catch (Throwable t) {
+                    
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                            .withDescription("Deserialize request failed")
+                            .withCause(t));
                 }
-                biStreamOnData(in);
-            } catch (Throwable t) {
-                transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                        .withDescription("Deserialize request failed")
-                        .withCause(t));
-            }
+            });
         }
 
         /**
@@ -168,7 +173,9 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
             if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
                 return;
             }
-            getStreamSubscriber().onCompleted();
+            execute(() -> {
+                getStreamSubscriber().onCompleted();
+            });
         }
     }
 }

Reply via email to