luoyuxia commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r915815057
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
Review Comment:
Redundant as the class is public
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
CloseableIterator.adapterForIterator(rows.iterator()),
rows.size());
});
-
- writeLock(
- () -> {
- submittedOperations.put(handle, operation);
- operation.run(service);
- });
+ // It means to acquire two locks if put the operation.run() and
submittedOperations.put(...)
Review Comment:
Why two locks will introduce deadlock?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
+ public class Operation {
+
+ private final OperationHandle operationHandle;
+
+ private final OperationType operationType;
+ private final boolean hasResults;
+ private final AtomicReference<OperationStatus> status;
+
+ private final Callable<ResultFetcher> resultSupplier;
+
+ private volatile Future<?> invocation;
+ private volatile ResultFetcher resultFetcher;
+ private volatile SqlExecutionException operationError;
+
+ public Operation(
+ OperationHandle operationHandle,
+ OperationType operationType,
+ Callable<ResultFetcher> resultSupplier) {
+ this.operationHandle = operationHandle;
+ this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+ this.operationType = operationType;
+ this.hasResults = true;
+ this.resultSupplier = resultSupplier;
+ }
+
+ void runBefore() {
+ updateState(OperationStatus.RUNNING);
+ }
+
+ void runAfter() {
+ updateState(OperationStatus.FINISHED);
+ }
+
+ public void run() {
+ try {
+ operationLock.acquire();
+ LOG.debug(
+ String.format(
+ "Operation %s acquires the operation lock.",
operationHandle));
+ updateState(OperationStatus.PENDING);
+ Runnable work =
+ () -> {
+ try {
+ runBefore();
+ resultFetcher = resultSupplier.call();
+ runAfter();
+ } catch (Throwable t) {
+ processThrowable(t);
+ }
+ };
+ // Please be careful: the returned future by the
ExecutorService will not wrap the
+ // done method.
+ FutureTask<Void> copiedTask =
+ new FutureTask<Void>(work, null) {
+ @Override
+ protected void done() {
+ LOG.debug(
+ String.format(
+ "Release the operation lock:
%s when task completes.",
+ operationHandle));
+ operationLock.release();
+ }
+ };
+ service.submit(copiedTask);
+ invocation = copiedTask;
+ // If it is canceled or closed, terminate the invocation.
+ OperationStatus current = status.get();
+ if (current == OperationStatus.CLOSED || current ==
OperationStatus.CANCELED) {
Review Comment:
In which case the OperationStatus will be `CLOSED` or `CANCELED`. Seems we
only check it once.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
CloseableIterator.adapterForIterator(rows.iterator()),
rows.size());
});
-
- writeLock(
- () -> {
- submittedOperations.put(handle, operation);
- operation.run(service);
- });
+ // It means to acquire two locks if put the operation.run() and
submittedOperations.put(...)
+ // in the writeLock block, which may introduce the deadlock here.
+ writeLock(() -> submittedOperations.put(handle, operation));
+ operation.run();
return handle;
}
Review Comment:
What will happen when `Operation.cancel()` is in race?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
+ public class Operation {
+
+ private final OperationHandle operationHandle;
+
+ private final OperationType operationType;
+ private final boolean hasResults;
+ private final AtomicReference<OperationStatus> status;
+
+ private final Callable<ResultFetcher> resultSupplier;
+
+ private volatile Future<?> invocation;
+ private volatile ResultFetcher resultFetcher;
+ private volatile SqlExecutionException operationError;
+
+ public Operation(
+ OperationHandle operationHandle,
+ OperationType operationType,
+ Callable<ResultFetcher> resultSupplier) {
+ this.operationHandle = operationHandle;
+ this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+ this.operationType = operationType;
+ this.hasResults = true;
+ this.resultSupplier = resultSupplier;
+ }
+
+ void runBefore() {
+ updateState(OperationStatus.RUNNING);
+ }
+
+ void runAfter() {
+ updateState(OperationStatus.FINISHED);
+ }
+
+ public void run() {
+ try {
+ operationLock.acquire();
+ LOG.debug(
+ String.format(
+ "Operation %s acquires the operation lock.",
operationHandle));
+ updateState(OperationStatus.PENDING);
+ Runnable work =
+ () -> {
+ try {
+ runBefore();
+ resultFetcher = resultSupplier.call();
+ runAfter();
+ } catch (Throwable t) {
+ processThrowable(t);
+ }
+ };
+ // Please be careful: the returned future by the
ExecutorService will not wrap the
+ // done method.
+ FutureTask<Void> copiedTask =
+ new FutureTask<Void>(work, null) {
+ @Override
+ protected void done() {
+ LOG.debug(
+ String.format(
+ "Release the operation lock:
%s when task completes.",
+ operationHandle));
+ operationLock.release();
+ }
+ };
+ service.submit(copiedTask);
+ invocation = copiedTask;
+ // If it is canceled or closed, terminate the invocation.
+ OperationStatus current = status.get();
+ if (current == OperationStatus.CLOSED || current ==
OperationStatus.CANCELED) {
+ LOG.debug(
+ String.format(
+ "The current status is %s after updating
the operation %s status to %s. Close the resources.",
+ current, operationHandle,
OperationStatus.PENDING));
+ closeResources();
+ }
+ } catch (Throwable t) {
+ processThrowable(t);
+ throw new SqlGatewayException(
+ "Failed to submit the operation to the thread pool.",
t);
+ } finally {
+ if (invocation == null) {
+ // failed to submit to the thread pool and release the
lock.
+ LOG.debug(
+ String.format(
+ "Operation %s releases the operation lock
when failed to submit the operation to the pool.",
+ operationHandle));
+ operationLock.release();
Review Comment:
The operationLock will also be released in copiedTask, which then cause
release for twice.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
+ public class Operation {
+
+ private final OperationHandle operationHandle;
+
+ private final OperationType operationType;
+ private final boolean hasResults;
+ private final AtomicReference<OperationStatus> status;
+
+ private final Callable<ResultFetcher> resultSupplier;
+
+ private volatile Future<?> invocation;
+ private volatile ResultFetcher resultFetcher;
+ private volatile SqlExecutionException operationError;
+
+ public Operation(
+ OperationHandle operationHandle,
+ OperationType operationType,
+ Callable<ResultFetcher> resultSupplier) {
+ this.operationHandle = operationHandle;
+ this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+ this.operationType = operationType;
+ this.hasResults = true;
+ this.resultSupplier = resultSupplier;
+ }
+
+ void runBefore() {
+ updateState(OperationStatus.RUNNING);
+ }
+
+ void runAfter() {
+ updateState(OperationStatus.FINISHED);
+ }
+
+ public void run() {
+ try {
+ operationLock.acquire();
+ LOG.debug(
+ String.format(
+ "Operation %s acquires the operation lock.",
operationHandle));
+ updateState(OperationStatus.PENDING);
+ Runnable work =
+ () -> {
+ try {
+ runBefore();
+ resultFetcher = resultSupplier.call();
Review Comment:
When the resultFetcher will be closed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]