deniskuzZ commented on code in PR #5649:
URL: https://github.com/apache/hive/pull/5649#discussion_r2017452000


##########
ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java:
##########
@@ -341,6 +345,29 @@ public enum AuthorizationMode{V1, V2};
    */
   private boolean compaction = false;
 
+  // A thread-safe set to hold all active session states
+  private static final Set<SessionState> sessionStates = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+  // Thread pool for concurrent cleanup
+  private static final ExecutorService cleanupExecutor = 
Executors.newCachedThreadPool();

Review Comment:
   Should it be a SingleThreadExecutor?



##########
ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java:
##########
@@ -1969,6 +2001,24 @@ public void close() throws IOException {
     dynamicVars.clear();
   }
 
+  private static void deregister(SessionState ss) {
+    // remove from the sessionStates list to avoid cleaning the closed session 
state again
+    sessionStates.remove(ss);
+  }
+
+  private static void cleanUpAllSessions() {
+    for (SessionState sessionState : sessionStates) {
+      cleanupExecutor.submit(() -> {

Review Comment:
   where do you wait for completion?
   ````
   private static void cleanUpAllSessions() {
     ExecutorService cleanupExecutor = Executors.newCachedThreadPool();
     try {
       List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
       for (SessionState sessionState : sessionStates) {
         Runnable task = () -> {
           try {
             LOG.info("Closing session state: {}", sessionState.getSessionId());
             sessionState.close();
           } catch (IOException e) {
             LOG.error("Problem closing session state", e);
           }
         };
         CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
             task, cleanupExecutor)
           .exceptionally(e -> {
             LOG.error("Error closing session state", e);
             return null;
           });
         asyncTasks.add(asyncTask);
       }
       CompletableFuture<Void> allTasks = 
CompletableFuture.allOf(asyncTasks.toArray(new CompletableFuture[0]));
       try {
         allTasks.get(60, TimeUnit.SECONDS);
       } catch (Exception e) {
         LOG.error("Failed to close all session states", e);
       }
     } finally {
       cleanupExecutor.shutdownNow();
     }
   }
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to