luoyuxia commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r920896827
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -365,12 +367,124 @@ public void
testSubmitOperationAndCloseOperationManagerInParallel() throws Excep
assertEquals(0, manager.getOperationCount());
}
+ @Test
+ public void testSubmitOperationAndCloseOperationManagerInParallel2()
throws Exception {
+ int count = 3;
+ CountDownLatch startRunning = new CountDownLatch(1);
+ CountDownLatch terminateRunning = new CountDownLatch(1);
+ SessionHandle sessionHandle =
service.openSession(defaultSessionEnvironment);
+ for (int i = 0; i < count; i++) {
+ threadFactory
+ .newThread(
+ () ->
+ service.submitOperation(
+ sessionHandle,
+ OperationType.UNKNOWN,
+ () -> {
+ startRunning.countDown();
+ terminateRunning.await();
+ return null;
+ }))
+ .start();
+ }
+ startRunning.await();
+ // close session should not be blocked
+ service.getSession(sessionHandle).getOperationManager().close();
+ terminateRunning.countDown();
Review Comment:
Nit: seems this line can be removed
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -36,24 +38,39 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import static
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
/** Manager for the {@link Operation}. */
public class OperationManager {
private static final Logger LOG =
LoggerFactory.getLogger(OperationManager.class);
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ /** The lock that controls the visit of the {@link OperationManager}'s
state. */
+ private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
private final Map<OperationHandle, Operation> submittedOperations;
private final ExecutorService service;
+ /**
+ * Operation lock is used to control the execution among the {@link
Operation}. The reason why
Review Comment:
minor:
```suggestion
* Operation lock is used to control the execution among the {@link
Operation}s. The reason why
```
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -36,24 +38,39 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import static
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
/** Manager for the {@link Operation}. */
public class OperationManager {
private static final Logger LOG =
LoggerFactory.getLogger(OperationManager.class);
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ /** The lock that controls the visit of the {@link OperationManager}'s
state. */
+ private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
private final Map<OperationHandle, Operation> submittedOperations;
private final ExecutorService service;
+ /**
+ * Operation lock is used to control the execution among the {@link
Operation}. The reason why
+ * using the lock to control the execution in sequence is the managers,
e.g. CatalogManager are
Review Comment:
minor:
```suggestion
* using the lock to control the execution in sequence is the managers,
e.g. CatalogManager is
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]