fsk119 commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r921087978
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
+ public class Operation {
+
+ private final OperationHandle operationHandle;
+
+ private final OperationType operationType;
+ private final boolean hasResults;
+ private final AtomicReference<OperationStatus> status;
+
+ private final Callable<ResultFetcher> resultSupplier;
+
+ private volatile Future<?> invocation;
+ private volatile ResultFetcher resultFetcher;
+ private volatile SqlExecutionException operationError;
+
+ public Operation(
+ OperationHandle operationHandle,
+ OperationType operationType,
+ Callable<ResultFetcher> resultSupplier) {
+ this.operationHandle = operationHandle;
+ this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+ this.operationType = operationType;
+ this.hasResults = true;
+ this.resultSupplier = resultSupplier;
+ }
+
+ void runBefore() {
+ updateState(OperationStatus.RUNNING);
+ }
+
+ void runAfter() {
+ updateState(OperationStatus.FINISHED);
+ }
+
+ public void run() {
+ try {
+ operationLock.acquire();
+ LOG.debug(
+ String.format(
+ "Operation %s acquires the operation lock.",
operationHandle));
+ updateState(OperationStatus.PENDING);
+ Runnable work =
+ () -> {
+ try {
+ runBefore();
+ resultFetcher = resultSupplier.call();
+ runAfter();
+ } catch (Throwable t) {
+ processThrowable(t);
+ }
+ };
+ // Please be careful: the returned future by the
ExecutorService will not wrap the
+ // done method.
+ FutureTask<Void> copiedTask =
+ new FutureTask<Void>(work, null) {
+ @Override
+ protected void done() {
+ LOG.debug(
+ String.format(
+ "Release the operation lock:
%s when task completes.",
+ operationHandle));
+ operationLock.release();
+ }
+ };
+ service.submit(copiedTask);
+ invocation = copiedTask;
+ // If it is canceled or closed, terminate the invocation.
+ OperationStatus current = status.get();
+ if (current == OperationStatus.CLOSED || current ==
OperationStatus.CANCELED) {
+ LOG.debug(
+ String.format(
+ "The current status is %s after updating
the operation %s status to %s. Close the resources.",
+ current, operationHandle,
OperationStatus.PENDING));
+ closeResources();
+ }
+ } catch (Throwable t) {
+ processThrowable(t);
+ throw new SqlGatewayException(
+ "Failed to submit the operation to the thread pool.",
t);
+ } finally {
+ if (invocation == null) {
+ // failed to submit to the thread pool and release the
lock.
+ LOG.debug(
+ String.format(
+ "Operation %s releases the operation lock
when failed to submit the operation to the pool.",
+ operationHandle));
+ operationLock.release();
Review Comment:
When enter to here, it means the Operation fails to submit itself to the
thread pool and the copiedTask will not execute.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
+ public class Operation {
+
+ private final OperationHandle operationHandle;
+
+ private final OperationType operationType;
+ private final boolean hasResults;
+ private final AtomicReference<OperationStatus> status;
+
+ private final Callable<ResultFetcher> resultSupplier;
+
+ private volatile Future<?> invocation;
+ private volatile ResultFetcher resultFetcher;
+ private volatile SqlExecutionException operationError;
+
+ public Operation(
+ OperationHandle operationHandle,
+ OperationType operationType,
+ Callable<ResultFetcher> resultSupplier) {
+ this.operationHandle = operationHandle;
+ this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+ this.operationType = operationType;
+ this.hasResults = true;
+ this.resultSupplier = resultSupplier;
+ }
+
+ void runBefore() {
+ updateState(OperationStatus.RUNNING);
+ }
+
+ void runAfter() {
+ updateState(OperationStatus.FINISHED);
+ }
+
+ public void run() {
+ try {
+ operationLock.acquire();
+ LOG.debug(
+ String.format(
+ "Operation %s acquires the operation lock.",
operationHandle));
+ updateState(OperationStatus.PENDING);
+ Runnable work =
+ () -> {
+ try {
+ runBefore();
+ resultFetcher = resultSupplier.call();
Review Comment:
When the operation is closed or canceled by the users, the fetcher will be
closed.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
+ public class Operation {
+
+ private final OperationHandle operationHandle;
+
+ private final OperationType operationType;
+ private final boolean hasResults;
+ private final AtomicReference<OperationStatus> status;
+
+ private final Callable<ResultFetcher> resultSupplier;
+
+ private volatile Future<?> invocation;
+ private volatile ResultFetcher resultFetcher;
+ private volatile SqlExecutionException operationError;
+
+ public Operation(
+ OperationHandle operationHandle,
+ OperationType operationType,
+ Callable<ResultFetcher> resultSupplier) {
+ this.operationHandle = operationHandle;
+ this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+ this.operationType = operationType;
+ this.hasResults = true;
+ this.resultSupplier = resultSupplier;
+ }
+
+ void runBefore() {
+ updateState(OperationStatus.RUNNING);
+ }
+
+ void runAfter() {
+ updateState(OperationStatus.FINISHED);
+ }
+
+ public void run() {
+ try {
+ operationLock.acquire();
+ LOG.debug(
+ String.format(
+ "Operation %s acquires the operation lock.",
operationHandle));
+ updateState(OperationStatus.PENDING);
+ Runnable work =
+ () -> {
+ try {
+ runBefore();
+ resultFetcher = resultSupplier.call();
+ runAfter();
+ } catch (Throwable t) {
+ processThrowable(t);
+ }
+ };
+ // Please be careful: the returned future by the
ExecutorService will not wrap the
+ // done method.
+ FutureTask<Void> copiedTask =
+ new FutureTask<Void>(work, null) {
+ @Override
+ protected void done() {
+ LOG.debug(
+ String.format(
+ "Release the operation lock:
%s when task completes.",
+ operationHandle));
+ operationLock.release();
+ }
+ };
+ service.submit(copiedTask);
+ invocation = copiedTask;
+ // If it is canceled or closed, terminate the invocation.
+ OperationStatus current = status.get();
+ if (current == OperationStatus.CLOSED || current ==
OperationStatus.CANCELED) {
Review Comment:
User invokes cancel/closeOperation.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
CloseableIterator.adapterForIterator(rows.iterator()),
rows.size());
});
-
- writeLock(
- () -> {
- submittedOperations.put(handle, operation);
- operation.run(service);
- });
+ // It means to acquire two locks if put the operation.run() and
submittedOperations.put(...)
+ // in the writeLock block, which may introduce the deadlock here.
+ writeLock(() -> submittedOperations.put(handle, operation));
+ operation.run();
return handle;
}
Review Comment:
Only one of the thread will success and others will get exception, e.g.
Operation has been caceled.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle
operationHandle, long token, int m
/** Closes the {@link OperationManager} and all operations. */
public void close() {
- lock.writeLock().lock();
+ stateLock.writeLock().lock();
try {
isRunning = false;
for (Operation operation : submittedOperations.values()) {
operation.close();
}
submittedOperations.clear();
} finally {
- lock.writeLock().unlock();
+ stateLock.writeLock().unlock();
}
LOG.debug("Closes the Operation Manager.");
}
//
-------------------------------------------------------------------------------------------
+ /** Operation to manage the execution, results and so on. */
+ @VisibleForTesting
Review Comment:
Because I don't want others to new Operation outside except in the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]