This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9e2a8f7 fix
new 2455b69 Merge pull request #2252 from
LebronAl/cluster_dispatcher_build_request_optimize
9e2a8f7 is described below
commit 9e2a8f7dce1b5e673e3e8655a4eccc780bb66867
Author: LebronAl <[email protected]>
AuthorDate: Fri Dec 11 15:40:56 2020 +0800
fix
---
cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java | 4 +++-
.../org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java | 4 ++--
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 751d3e8..21820d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -91,7 +91,9 @@ public class LogDispatcher {
public void offer(SendLogRequest log) {
// do serialization here to avoid taking LogManager for too long
- log.serializedLogFuture = serializationService.submit(() ->
log.getLog().serialize());
+ if (!nodeLogQueues.isEmpty()) {
+ log.serializedLogFuture = serializationService.submit(() ->
log.getLog().serialize());
+ }
for (int i = 0; i < nodeLogQueues.size(); i++) {
BlockingQueue<SendLogRequest> nodeLogQueue = nodeLogQueues.get(i);
try {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 0958432..bfe57a1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.cluster.log.applier;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -63,7 +63,7 @@ public class AsyncDataLogApplier implements LogApplier {
public AsyncDataLogApplier(LogApplier embeddedApplier, String name) {
this.embeddedApplier = embeddedApplier;
- consumerMap = new ConcurrentHashMap<>();
+ consumerMap = new HashMap<>();
consumerPool = new ThreadPoolExecutor(CONCURRENT_CONSUMER_NUM,
Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
this.name = name;