This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new f561138 [3.0] Add serializingExecutor (#8999)
f561138 is described below
commit f561138fe92cc838146b76479fdff70ab163401d
Author: earthchen <[email protected]>
AuthorDate: Sat Oct 9 03:25:32 2021 -0500
[3.0] Add serializingExecutor (#8999)
* add serializingExecutor
* wrapper executor for tri stream
* remove unused set
* remove print out
* add timeout
* add timeout
---
.../threadpool/serial/SerializingExecutor.java | 112 +++++++++++++++++++++
.../threadpool/serial/SerializingExecutorTest.java | 69 +++++++++++++
.../rpc/protocol/tri/AbstractServerStream.java | 21 ++--
.../dubbo/rpc/protocol/tri/AbstractStream.java | 49 +++++----
.../dubbo/rpc/protocol/tri/ServerStream.java | 53 +++++-----
5 files changed, 241 insertions(+), 63 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
new file mode 100644
index 0000000..e106523
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool.serial;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Executor ensuring that all {@link Runnable} tasks submitted are executed in
order
+ * using the provided {@link Executor}, and serially such that no two will
ever be
+ * running at the same time.
+ */
+public final class SerializingExecutor implements Executor, Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SerializingExecutor.class);
+
+ /**
+ * Use false to stop and true to run
+ */
+ private final AtomicBoolean atomicBoolean = new AtomicBoolean();
+
+ private final Executor executor;
+
+ private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Creates a SerializingExecutor, running tasks using {@code executor}.
+ *
+ * @param executor Executor in which tasks should be run. Must not be null.
+ */
+ public SerializingExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Runs the given runnable strictly after all Runnables that were submitted
+ * before it, and using the {@code executor} passed to the constructor.
.
+ */
+ @Override
+ public void execute(Runnable r) {
+ runQueue.add(r);
+ schedule(r);
+ }
+
+ private void schedule(Runnable removable) {
+ if (atomicBoolean.compareAndSet(false, true)) {
+ boolean success = false;
+ try {
+ executor.execute(this);
+ success = true;
+ } finally {
+ // It is possible that at this point that there are still
tasks in
+ // the queue, it would be nice to keep trying but the error
may not
+ // be recoverable. So we update our state and propagate so
that if
+ // our caller deems it recoverable we won't be stuck.
+ if (!success) {
+ if (removable != null) {
+ // This case can only be reached if 'this' was not
currently running, and we failed to
+ // reschedule. The item should still be in the queue
for removal.
+ // ConcurrentLinkedQueue claims that null elements are
not allowed, but seems to not
+ // throw if the item to remove is null. If removable
is present in the queue twice,
+ // the wrong one may be removed. It doesn't seem
possible for this case to exist today.
+ // This is important to run in case of
RejectedExectuionException, so that future calls
+ // to execute don't succeed and accidentally run a
previous runnable.
+ runQueue.remove(removable);
+ }
+ atomicBoolean.set(false);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ Runnable r;
+ try {
+ while ((r = runQueue.poll()) != null) {
+ try {
+ r.run();
+ } catch (RuntimeException e) {
+ LOGGER.error("Exception while executing runnable " + r, e);
+ }
+ }
+ } finally {
+ atomicBoolean.set(false);
+ }
+ if (!runQueue.isEmpty()) {
+ // we didn't enqueue anything but someone else did.
+ schedule(null);
+ }
+ }
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutorTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutorTest.java
new file mode 100644
index 0000000..6d12378
--- /dev/null
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.threadpool.serial;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+public class SerializingExecutorTest {
+
+ protected static SerializingExecutor serializingExecutor;
+
+ @BeforeAll
+ public static void before() {
+ ExecutorService service = Executors.newFixedThreadPool(4);
+ serializingExecutor = new SerializingExecutor(service);
+ }
+
+ @Test
+ public void test1() throws InterruptedException {
+ int n = 2;
+ int eachCount = 1000;
+ int total = n * eachCount;
+ int sleepMillis = 10;
+ Map<String, Integer> map = new HashMap<>();
+ map.put("val", 0);
+ CountDownLatch downLatch = new CountDownLatch(total);
+ for (int i = 0; i < total; i++) {
+ final int index = i;
+ Thread.sleep(ThreadLocalRandom.current().nextInt(sleepMillis));
+ serializingExecutor.execute(() -> {
+ try {
+
Thread.sleep(ThreadLocalRandom.current().nextInt(sleepMillis));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ int num = map.get("val");
+ map.put("val", num + 1);
+ downLatch.countDown();
+ Assertions.assertEquals(num, index);
+ });
+ }
+ downLatch.await(3, TimeUnit.SECONDS);
+ Assertions.assertEquals(total, map.get("val"));
+ }
+}
\ No newline at end of file
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 71891b4..92127e7 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.Invoker;
@@ -55,7 +54,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
}
protected AbstractServerStream(URL url, ProviderModel providerModel) {
- this(url, lookupExecutor(url, providerModel), providerModel);
+ this(url, lookupExecutor(providerModel), providerModel);
}
protected AbstractServerStream(URL url, Executor executor, ProviderModel
providerModel) {
@@ -65,20 +64,12 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
this.headerFilters =
url.getOrDefaultApplicationModel().getExtensionLoader(HeaderFilter.class).getActivateExtension(url,
HEADER_FILTER_KEY);
}
- private static Executor lookupExecutor(URL url, ProviderModel
providerModel) {
- ExecutorService executor = null;
- if (providerModel != null) {
- executor = (ExecutorService) providerModel.getServiceMetadata()
- .getAttribute(CommonConstants.THREADPOOL_KEY);
+ private static Executor lookupExecutor(ProviderModel providerModel) {
+ if (providerModel == null) {
+ return null;
}
- ExecutorRepository executorRepository =
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
- if (executor == null) {
- executor = executorRepository.getExecutor(url);
- }
- if (executor == null) {
- executor = executorRepository.createExecutorIfAbsent(url);
- }
- return executor;
+ return (ExecutorService) providerModel.getServiceMetadata()
+ .getAttribute(CommonConstants.THREADPOOL_KEY);
}
public static UnaryServerStream unary(URL url) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index e1f6c74..c6383cd 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -21,7 +21,8 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.Constants;
import org.apache.dubbo.remoting.exchange.Request;
@@ -37,32 +38,13 @@ import com.google.rpc.Status;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
public abstract class AbstractStream implements Stream {
protected static final String DUPLICATED_DATA = "Duplicated data";
- private static final List<Executor> CALLBACK_EXECUTORS = new
ArrayList<>(4);
-
- static {
- ThreadFactory tripleTF = new
NamedInternalThreadFactory("tri-callback", true);
- for (int i = 0; i < 4; i++) {
- final ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.DAYS,
- new LinkedBlockingQueue<>(1024),
- tripleTF, new ThreadPoolExecutor.AbortPolicy());
- CALLBACK_EXECUTORS.add(tp);
- }
-
- }
private final URL url;
private final MultipleSerialization multipleSerialization;
@@ -78,14 +60,14 @@ public abstract class AbstractStream implements Stream {
private TransportObserver transportSubscriber;
private final CancellationContext cancellationContext;
- private boolean cancelled = false;
+ private volatile boolean cancelled = false;
public boolean isCancelled() {
return cancelled;
}
protected AbstractStream(URL url) {
- this(url, allocateCallbackExecutor());
+ this(url, null);
}
protected CancellationContext getCancellationContext() {
@@ -94,7 +76,8 @@ public abstract class AbstractStream implements Stream {
protected AbstractStream(URL url, Executor executor) {
this.url = url;
- this.executor = executor;
+ final Executor sourceExecutor = lookupExecutor(url, executor);
+ this.executor = wrapperSerializingExecutor(sourceExecutor);
final String value =
url.getParameter(Constants.MULTI_SERIALIZATION_KEY,
CommonConstants.DEFAULT_KEY);
this.multipleSerialization =
url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
.getExtension(value);
@@ -103,8 +86,24 @@ public abstract class AbstractStream implements Stream {
this.streamObserver = createStreamObserver();
}
- private static Executor allocateCallbackExecutor() {
- return CALLBACK_EXECUTORS.get(ThreadLocalRandom.current().nextInt(4));
+
+ private Executor lookupExecutor(URL url, Executor executor) {
+ // only server maybe not null
+ if (executor != null) {
+ return executor;
+ }
+ ExecutorRepository executorRepository =
url.getOrDefaultApplicationModel()
+ .getExtensionLoader(ExecutorRepository.class)
+ .getDefaultExtension();
+ Executor urlExecutor = executorRepository.getExecutor(url);
+ if (urlExecutor == null) {
+ urlExecutor = executorRepository.createExecutorIfAbsent(url);
+ }
+ return urlExecutor;
+ }
+
+ private Executor wrapperSerializingExecutor(Executor executor) {
+ return new SerializingExecutor(executor);
}
public Request getRequest() {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index b4374f8..666fb2e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -92,35 +92,40 @@ public class ServerStream extends AbstractServerStream
implements Stream {
if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
return;
}
- try {
-
RpcContext.restoreCancellationContext(getCancellationContext());
- final RpcInvocation inv = buildInvocation(metadata);
- inv.setArguments(new Object[]{asStreamObserver()});
- final Result result = getInvoker().invoke(inv);
+ execute(() -> {
try {
- subscribe((StreamObserver<Object>) result.getValue());
- } catch (Throwable t) {
-
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Failed to create server's
observer"));
+
RpcContext.restoreCancellationContext(getCancellationContext());
+ final RpcInvocation inv = buildInvocation(metadata);
+ inv.setArguments(new Object[]{asStreamObserver()});
+ final Result result = getInvoker().invoke(inv);
+ try {
+ subscribe((StreamObserver<Object>) result.getValue());
+ } catch (Throwable t) {
+
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription("Failed to create server's
observer"));
+ }
+ } finally {
+ RpcContext.removeCancellationContext();
}
- } finally {
- RpcContext.removeCancellationContext();
- }
+ });
+
}
@Override
public void onData(byte[] in, boolean endStream) {
- try {
- if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
- serverStreamOnData(in);
- return;
+ execute(() -> {
+ try {
+ if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
+ serverStreamOnData(in);
+ return;
+ }
+ biStreamOnData(in);
+ } catch (Throwable t) {
+
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription("Deserialize request failed")
+ .withCause(t));
}
- biStreamOnData(in);
- } catch (Throwable t) {
- transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Deserialize request failed")
- .withCause(t));
- }
+ });
}
/**
@@ -168,7 +173,9 @@ public class ServerStream extends AbstractServerStream
implements Stream {
if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
return;
}
- getStreamSubscriber().onCompleted();
+ execute(() -> {
+ getStreamSubscriber().onCompleted();
+ });
}
}
}