This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98b65a1ba4 Fix Dead Lock Bug (#6331)
98b65a1ba4 is described below
commit 98b65a1ba4a38d05588fa572dd78be804146ef4c
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jun 19 18:38:48 2022 +0800
Fix Dead Lock Bug (#6331)
---
.../execution/datatransfer/LocalSinkHandle.java | 72 ++++++++++++++--------
.../execution/datatransfer/LocalSourceHandle.java | 41 ++++++++----
.../execution/datatransfer/SharedTsBlockQueue.java | 23 +++----
3 files changed, 84 insertions(+), 52 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
index fa59e7c98e..40b5760b82 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSinkHandle.java
@@ -85,32 +85,43 @@ public class LocalSinkHandle implements ISinkHandle {
@Override
public boolean isFinished() {
- return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+ synchronized (queue) {
+ return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+ }
}
public void checkAndInvokeOnFinished() {
- if (isFinished()) {
- synchronized (this) {
- sinkHandleListener.onFinish(this);
+ synchronized (queue) {
+ if (isFinished()) {
+ synchronized (this) {
+ sinkHandleListener.onFinish(this);
+ }
}
}
}
@Override
- public synchronized void send(List<TsBlock> tsBlocks) {
+ public void send(List<TsBlock> tsBlocks) {
Validate.notNull(tsBlocks, "tsBlocks is null");
- if (aborted) {
- throw new IllegalStateException("Sink handle is aborted.");
- }
- if (!blocked.isDone()) {
- throw new IllegalStateException("Sink handle is blocked.");
- }
- if (queue.hasNoMoreTsBlocks()) {
- return;
+ synchronized (this) {
+ if (aborted) {
+ throw new IllegalStateException("Sink handle is aborted.");
+ }
+ if (!blocked.isDone()) {
+ throw new IllegalStateException("Sink handle is blocked.");
+ }
}
- logger.info("send TsBlocks. Size: {}", tsBlocks.size());
- for (TsBlock tsBlock : tsBlocks) {
- blocked = queue.add(tsBlock);
+
+ synchronized (queue) {
+ if (queue.hasNoMoreTsBlocks()) {
+ return;
+ }
+ logger.info("send TsBlocks. Size: {}", tsBlocks.size());
+ synchronized (this) {
+ for (TsBlock tsBlock : tsBlocks) {
+ blocked = queue.add(tsBlock);
+ }
+ }
}
}
@@ -121,24 +132,33 @@ public class LocalSinkHandle implements ISinkHandle {
@Override
public void setNoMoreTsBlocks() {
- synchronized (this) {
- logger.info("set noMoreTsBlocks.");
- if (aborted) {
- return;
+ synchronized (queue) {
+ synchronized (this) {
+ logger.info("set noMoreTsBlocks.");
+ if (aborted) {
+ return;
+ }
+ queue.setNoMoreTsBlocks(true);
+ sinkHandleListener.onEndOfBlocks(this);
}
- queue.setNoMoreTsBlocks(true);
- sinkHandleListener.onEndOfBlocks(this);
}
checkAndInvokeOnFinished();
logger.info("noMoreTsBlocks has been set.");
}
@Override
- public synchronized void abort() {
+ public void abort() {
logger.info("Sink handle is being aborted.");
- aborted = true;
- queue.destroy();
- sinkHandleListener.onAborted(this);
+ synchronized (queue) {
+ synchronized (this) {
+ if (aborted) {
+ return;
+ }
+ aborted = true;
+ queue.destroy();
+ sinkHandleListener.onAborted(this);
+ }
+ }
logger.info("Sink handle is aborted");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
index ac91afacda..644a21ccb9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandle.java
@@ -26,11 +26,16 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static
org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager.createFullIdFrom;
public class LocalSourceHandle implements ISourceHandle {
+
+ private static final Logger logger =
LoggerFactory.getLogger(LocalSourceHandle.class);
+
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
private final String localPlanNodeId;
@@ -81,7 +86,7 @@ public class LocalSourceHandle implements ISourceHandle {
throw new IllegalStateException("Source handle is blocked.");
}
TsBlock tsBlock;
- synchronized (this) {
+ synchronized (queue) {
tsBlock = queue.remove();
}
checkAndInvokeOnFinished();
@@ -91,16 +96,20 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public boolean isFinished() {
- return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+ synchronized (queue) {
+ return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+ }
}
public void checkAndInvokeOnFinished() {
- if (isFinished()) {
- // Putting synchronized here rather than marking in method is to avoid
deadlock.
- // There are two locks need to invoke this method. One is lock of
SharedTsBlockQueue,
- // the other is lock of LocalSourceHandle.
- synchronized (this) {
- sourceHandleListener.onFinished(this);
+ synchronized (queue) {
+ if (isFinished()) {
+ // Putting synchronized here rather than marking in method is to avoid
deadlock.
+ // There are two locks need to invoke this method. One is lock of
SharedTsBlockQueue,
+ // the other is lock of LocalSourceHandle.
+ synchronized (this) {
+ sourceHandleListener.onFinished(this);
+ }
}
}
}
@@ -120,14 +129,20 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public synchronized void abort() {
+ logger.info("Source handle is being aborted.");
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
- if (aborted) {
- return;
+ synchronized (queue) {
+ synchronized (this) {
+ if (aborted) {
+ return;
+ }
+ queue.destroy();
+ aborted = true;
+ sourceHandleListener.onAborted(this);
+ }
}
- queue.destroy();
- aborted = true;
- sourceHandleListener.onAborted(this);
}
+ logger.info("Source handle is aborted");
}
public TFragmentInstanceId getRemoteFragmentInstanceId() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index 61b94f3ecd..729e7323b5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -29,11 +29,13 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
import java.util.LinkedList;
import java.util.Queue;
+/** This is not thread safe class, the caller should ensure multi-threads
safety. */
+@NotThreadSafe
public class SharedTsBlockQueue {
private static final Logger logger =
LoggerFactory.getLogger(SharedTsBlockQueue.class);
@@ -41,22 +43,16 @@ public class SharedTsBlockQueue {
private final TFragmentInstanceId localFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
- @GuardedBy("this")
private boolean noMoreTsBlocks = false;
- @GuardedBy("this")
private long bufferRetainedSizeInBytes = 0L;
- @GuardedBy("this")
private final Queue<TsBlock> queue = new LinkedList<>();
- @GuardedBy("this")
private SettableFuture<Void> blocked = SettableFuture.create();
- @GuardedBy("this")
private ListenableFuture<Void> blockedOnMemory;
- @GuardedBy("this")
private boolean destroyed = false;
private LocalSourceHandle sourceHandle;
@@ -70,7 +66,7 @@ public class SharedTsBlockQueue {
Validate.notNull(localMemoryManager, "local memory manager cannot be
null");
}
- public synchronized boolean hasNoMoreTsBlocks() {
+ public boolean hasNoMoreTsBlocks() {
return noMoreTsBlocks;
}
@@ -82,7 +78,7 @@ public class SharedTsBlockQueue {
return blocked;
}
- public synchronized boolean isEmpty() {
+ public boolean isEmpty() {
return queue.isEmpty();
}
@@ -95,7 +91,8 @@ public class SharedTsBlockQueue {
}
/** Notify no more tsblocks will be added to the queue. */
- public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+ public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+ logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
if (destroyed) {
throw new IllegalStateException("queue has been destroyed");
}
@@ -112,7 +109,7 @@ public class SharedTsBlockQueue {
* Remove a tsblock from the head of the queue and return. Should be invoked
only when the future
* returned by {@link #isBlocked()} completes.
*/
- public synchronized TsBlock remove() {
+ public TsBlock remove() {
if (destroyed) {
throw new IllegalStateException("queue has been destroyed");
}
@@ -136,7 +133,7 @@ public class SharedTsBlockQueue {
* Add tsblocks to the queue. Except the first invocation, this method
should be invoked only when
* the returned future of last invocation completes.
*/
- public synchronized ListenableFuture<Void> add(TsBlock tsBlock) {
+ public ListenableFuture<Void> add(TsBlock tsBlock) {
if (destroyed) {
throw new IllegalStateException("queue has been destroyed");
}
@@ -156,7 +153,7 @@ public class SharedTsBlockQueue {
}
/** Destroy the queue and cancel the future. */
- public synchronized void destroy() {
+ public void destroy() {
if (destroyed) {
return;
}