luoyuxia commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r920896827


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -365,12 +367,124 @@ public void 
testSubmitOperationAndCloseOperationManagerInParallel() throws Excep
         assertEquals(0, manager.getOperationCount());
     }
 
+    @Test
+    public void testSubmitOperationAndCloseOperationManagerInParallel2() 
throws Exception {
+        int count = 3;
+        CountDownLatch startRunning = new CountDownLatch(1);
+        CountDownLatch terminateRunning = new CountDownLatch(1);
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        for (int i = 0; i < count; i++) {
+            threadFactory
+                    .newThread(
+                            () ->
+                                    service.submitOperation(
+                                            sessionHandle,
+                                            OperationType.UNKNOWN,
+                                            () -> {
+                                                startRunning.countDown();
+                                                terminateRunning.await();
+                                                return null;
+                                            }))
+                    .start();
+        }
+        startRunning.await();
+        // close session should not be blocked
+        service.getSession(sessionHandle).getOperationManager().close();
+        terminateRunning.countDown();

Review Comment:
   Nit: seems this line can be removed



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -36,24 +38,39 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
 /** Manager for the {@link Operation}. */
 public class OperationManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OperationManager.class);
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    /** The lock that controls the visit of the {@link OperationManager}'s 
state. */
+    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
     private final Map<OperationHandle, Operation> submittedOperations;
     private final ExecutorService service;
+    /**
+     * Operation lock is used to control the execution among the {@link 
Operation}. The reason why

Review Comment:
   minor:
   ```suggestion
        * Operation lock is used to control the execution among the {@link 
Operation}s. The reason why
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -36,24 +38,39 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
 /** Manager for the {@link Operation}. */
 public class OperationManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OperationManager.class);
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    /** The lock that controls the visit of the {@link OperationManager}'s 
state. */
+    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
     private final Map<OperationHandle, Operation> submittedOperations;
     private final ExecutorService service;
+    /**
+     * Operation lock is used to control the execution among the {@link 
Operation}. The reason why
+     * using the lock to control the execution in sequence is the managers, 
e.g. CatalogManager are

Review Comment:
   minor: 
   ```suggestion
        * using the lock to control the execution in sequence is the managers, 
e.g. CatalogManager is
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to