[ 
https://issues.apache.org/jira/browse/HIVE-24201?focusedWorklogId=568948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-568948
 ]

ASF GitHub Bot logged work on HIVE-24201:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/21 13:31
            Start Date: 19/Mar/21 13:31
    Worklog Time Spent: 10m 
      Work Description: sankarh commented on a change in pull request #2065:
URL: https://github.com/apache/hive/pull/2065#discussion_r597613584



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -363,6 +352,21 @@ public MoveSession(final WmTezSession srcSession, final 
String destPool) {
     public String toString() {
       return srcSession.getSessionId() + " moving from " + 
srcSession.getPoolName() + " to " + destPool;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o==this) return true;

Review comment:
       nit: 
   1. Need space before and after binary operators. 
   2. Even single statement block should be put under {}.
   3. Space after keywords such as "if"

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;

Review comment:
       nit: Use () to clearly mark the boundary. 
   int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize - 
remainingCapacity) : 0;
   
   if ((delayedMovesToProcess > 0) && (pool.delayedMoveSessions.size() > 0)) {

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -794,17 +828,17 @@ private void handleMoveSessionOnMasterThread(final 
MoveSession moveSession,
     final WmThreadSyncWork syncWork,
     final HashSet<String> poolsToRedistribute,
     final Map<WmTezSession, GetRequest> toReuse,
-    final Map<WmTezSession, WmEvent> recordMoveEvents) {
+    final Map<WmTezSession, WmEvent> recordMoveEvents, final boolean 
moveImmediately) {
     String destPoolName = moveSession.destPool;
-    LOG.info("Handling move session event: {}", moveSession);
+    LOG.info("Handling move session event: {}, move immediately: {}", 
moveSession, moveImmediately);
     if (validMove(moveSession.srcSession, destPoolName)) {
       WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
-      // remove from src pool
-      RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+      // check if there is capacity in dest pool
+      if (capacityAvailable(destPoolName)) {

Review comment:
       We shouldn't change the sequence of validation here. Modify the flow as 
follows.
   ```
   RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
   if (rr == RemoveSessionResult.OK) {
     if (capacityAvailable(destPoolName)) {
       // Existing code to move the session
     } else {
       if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE) && 
!moveImmediately) {
         // new code to move it to delayed sessions list
       } else {
         // Existing code to kill the query
       }
     }
   } else {
     // Existing code to log error msg.
   }
   ```

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -32,18 +32,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;

Review comment:
       Expand the imports instead of using *.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)

Review comment:
       nit: Need {}

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -641,7 +645,7 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     // as possible
     Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
     for (MoveSession moveSession : e.moveSessions) {
-      handleMoveSessionOnMasterThread(moveSession, syncWork, 
poolsToRedistribute, e.toReuse, recordMoveEvents);
+      handleMoveSessionOnMasterThread(moveSession, syncWork, 
poolsToRedistribute, e.toReuse, recordMoveEvents, false);

Review comment:
       The last argument can be set based on new config instead of passing 
false here and then take true flow if the config =false.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;
+        if (delayedMovesToProcess > 0 && pool.delayedMoveSessions.size() > 0) {
+          int i = 0;
+          Iterator<MoveSession> itr = pool.delayedMoveSessions.iterator();
+          while (i < delayedMovesToProcess && itr.hasNext()) {
+            MoveSession moveSession = itr.next();
+            itr.remove();
+            WmTezSession srcSession = moveSession.srcSession;
+            if (pool.sessions.contains(srcSession)) {
+              LOG.info("Processing delayed move {} for pool {} in wm main 
thread as the pool has queued requests",
+                  moveSession, poolName);
+              i++;
+              handleMoveSessionOnMasterThread(moveSession, syncWork, 
poolsToRedistribute, e.toReuse, recordMoveEvents,
+                  true);
+            } else

Review comment:
       Shall remove else block. It is redundant.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -816,18 +850,34 @@ private void handleMoveSessionOnMasterThread(final 
MoveSession moveSession,
             LOG.error("Failed to move session: {}. Session is not added to 
destination.", moveSession);
           }
         } else {
-          WmTezSession session = moveSession.srcSession;
-          KillQueryContext killQueryContext = new KillQueryContext(session, 
"Destination pool " + destPoolName +
-            " is full. Killing query.");
-          resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
+        LOG.error("Failed to move session: {}. Session is not removed from its 
pool.", moveSession);

Review comment:
       nit: Misaligned statement. Add 2 spaces.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;
+        if (delayedMovesToProcess > 0 && pool.delayedMoveSessions.size() > 0) {
+          int i = 0;
+          Iterator<MoveSession> itr = pool.delayedMoveSessions.iterator();
+          while (i < delayedMovesToProcess && itr.hasNext()) {

Review comment:
       i < delayedMovesToProcess can be moved inside the loop to avoid 
redundant checks if i is not incremented.
   
   i++;
   handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, 
e.toReuse, recordMoveEvents,
                     true);
   if (i >= delayedMovesToProcess) {
     break;
   }

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -794,17 +828,17 @@ private void handleMoveSessionOnMasterThread(final 
MoveSession moveSession,
     final WmThreadSyncWork syncWork,
     final HashSet<String> poolsToRedistribute,
     final Map<WmTezSession, GetRequest> toReuse,
-    final Map<WmTezSession, WmEvent> recordMoveEvents) {
+    final Map<WmTezSession, WmEvent> recordMoveEvents, final boolean 
moveImmediately) {

Review comment:
       To keep it uniform with the new config, can we rename the variable as 
"delayedMove" instead of "moveImmediately" and use it accordingly?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;
+        if (delayedMovesToProcess > 0 && pool.delayedMoveSessions.size() > 0) {
+          int i = 0;

Review comment:
       nit: Shall use the name "movedCount" instead of "i".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 568948)
    Time Spent: 1h  (was: 50m)

> WorkloadManager kills query being moved to different pool if destination pool 
> does not have enough sessions
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-24201
>                 URL: https://issues.apache.org/jira/browse/HIVE-24201
>             Project: Hive
>          Issue Type: Improvement
>          Components: HiveServer2, llap
>    Affects Versions: 4.0.0
>            Reporter: Adesh Kumar Rao
>            Assignee: Pritha Dawn
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> To reproduce, create a resource plan with move trigger, like below:
> {code:java}
> +----------------------------------------------------+
> |                        line                        |
> +----------------------------------------------------+
> | experiment[status=DISABLED,parallelism=null,defaultPool=default] |
> |  +  default[allocFraction=0.888,schedulingPolicy=null,parallelism=1] |
> |      |  mapped for default                         |
> |  +  pool2[allocFraction=0.1,schedulingPolicy=fair,parallelism=1] |
> |      |  trigger t1: if (ELAPSED_TIME > 20) { MOVE TO pool1 } |
> |      |  mapped for users: abcd                   |
> |  +  pool1[allocFraction=0.012,schedulingPolicy=null,parallelism=1] |
> |      |  mapped for users: efgh                   |
>  
> {code}
> Now, run two queries in pool1 and pool2 using different users. The query 
> running in pool2 will tried to move to pool1 and it will get killed because 
> pool1 will not have session to handle the query.
> Currently, the Workload management move trigger kills the query being moved 
> to a different pool if destination pool does not have enough capacity.  We 
> could have a "delayed move" configuration which lets the query run in the 
> source pool as long as possible, if the destination pool is full. It will 
> attempt the move to destination pool only when there is claim upon the source 
> pool. If the destination pool is not full, delayed move behaves as normal 
> move i.e. the move will happen immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to