jerrypeng commented on a change in pull request #12178:
URL: https://github.com/apache/pulsar/pull/12178#discussion_r716906045



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
##########
@@ -156,6 +160,65 @@ public void rebalance() {
         workers().rebalance(uri.getRequestUri(), clientAppId());
     }
 
+    @PUT
+    @ApiOperation(
+            value = "Drains the specified worker, i.e., moves its 
work-assignments to other workers"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 409, message = "Drain already in progress"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/leader/{drain}")
+    public void drainAtLeader(@PathParam("drain") String workerId) {
+        workers().drain(uri.getRequestUri(), clientAppId(), true);
+    }
+
+    @PUT
+    @ApiOperation(
+            value = "Drains this worker, i.e., moves its work-assignments to 
other workers"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 409, message = "Drain already in progress"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/drain")
+    public void drain() {
+        workers().drain(uri.getRequestUri(), clientAppId(), false);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Get the status of any ongoing drain operation at the 
specified worker",
+            response = LongRunningProcessStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/leader/{drain}")

Review comment:
       I think a better name for the URL parameter is "workerId".

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
##########
@@ -156,6 +160,65 @@ public void rebalance() {
         workers().rebalance(uri.getRequestUri(), clientAppId());
     }
 
+    @PUT
+    @ApiOperation(
+            value = "Drains the specified worker, i.e., moves its 
work-assignments to other workers"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 409, message = "Drain already in progress"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/leader/{drain}")
+    public void drainAtLeader(@PathParam("drain") String workerId) {
+        workers().drain(uri.getRequestUri(), clientAppId(), true);
+    }
+
+    @PUT
+    @ApiOperation(
+            value = "Drains this worker, i.e., moves its work-assignments to 
other workers"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 409, message = "Drain already in progress"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/drain")
+    public void drain() {
+        workers().drain(uri.getRequestUri(), clientAppId(), false);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Get the status of any ongoing drain operation at the 
specified worker",
+            response = LongRunningProcessStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/leader/{drain}")
+    public LongRunningProcessStatus getDrainStatus(@PathParam("drain") String 
workerId) {
+        return workers().getDrainStatus(uri.getRequestUri(), clientAppId(), 
true);

Review comment:
       Why are you passing the URI into "getDrainStatus()" and later trying to 
parse the workerId from the URI instead of of just passing in the workerId 
directly to "getDrainStatus()"?

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
##########
@@ -156,6 +160,65 @@ public void rebalance() {
         workers().rebalance(uri.getRequestUri(), clientAppId());
     }
 
+    @PUT
+    @ApiOperation(
+            value = "Drains the specified worker, i.e., moves its 
work-assignments to other workers"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 409, message = "Drain already in progress"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/leader/{drain}")
+    public void drainAtLeader(@PathParam("drain") String workerId) {

Review comment:
       I think a better name for the URL parameter is "workerId".
   
   

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
##########
@@ -127,6 +129,65 @@ public void rebalance() {
         workers().rebalance(uri.getRequestUri(), clientAppId());
     }
 
+    @PUT
+    @ApiOperation(
+            value = "Drains the specified worker, i.e., moves its 
work-assignments to other workers"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 409, message = "Drain already in progress"),
+            @ApiResponse(code = 503, message = "Worker service is not ready")
+    })
+    @Path("/leader/{drain}")

Review comment:
       Please refer to my comments in WorkerApiV2Resource.java

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -415,6 +633,34 @@ private void compactAssignmentTopic() {
         }
     }
 
+    protected int updateWorkerDrainMap() {

Review comment:
       Since this gets run periodically, lets make sure method cannot somehow 
get invoked concurrently.

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -415,6 +633,34 @@ private void compactAssignmentTopic() {
         }
     }
 
+    protected int updateWorkerDrainMap() {
+        long startTime = System.nanoTime();
+        int numRemovedWorkerIds = 0;
+
+        if (drainOpStatusMap.size() > 0) {

Review comment:
       I know drainOpStatusMap is a concurrent map but we are reading and 
writing from different different threads.  This makes me nervous.

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -415,6 +633,34 @@ private void compactAssignmentTopic() {
         }
     }
 
+    protected int updateWorkerDrainMap() {
+        long startTime = System.nanoTime();
+        int numRemovedWorkerIds = 0;
+
+        if (drainOpStatusMap.size() > 0) {

Review comment:
       I know drainOpStatusMap is a concurrent map but we are reading and 
writing from different different threads.  This makes me nervous.  Is there a 
reason we should synchronize this with drain operation?

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -215,28 +239,147 @@ public synchronized void initialize(Producer<byte[]> 
exclusiveProducer) {
     }
 
     public Future<?> rebalanceIfNotInprogress() {
-        if (rebalanceInProgess.compareAndSet(false, true)) {
+        if (rebalanceInProgress.compareAndSet(false, true)) {
+            val numWorkers = getCurrentAvailableNumWorkers();
+            if (numWorkers <= 1) {
+                rebalanceInProgress.set(false);
+                throw new TooFewWorkersException();
+            }
             currentRebalanceFuture = rebalance();
             return currentRebalanceFuture;
         } else {
             throw new RebalanceInProgressException();
         }
     }
 
-    @VisibleForTesting
-    void invokeScheduler() {
+    private Future<?> drain(String workerId) {
+        return scheduleInternal(() -> {
+            workerStatsManager.drainTotalExecTimeStart();
+            currentPostDrainAssignments = invokeDrain(workerId);
+            workerStatsManager.drainTotalExecTimeEnd();
+        }, "Encountered error when invoking drain");
+    }
+
+    public Future<?> drainIfNotInProgress(String workerId) {
+        if (drainInProgressFlag.compareAndSet(false, true)) {
+            try {
+                val availableWorkers = getCurrentAvailableWorkers();
+                if (availableWorkers.size() <= 1) {
+                    throw new TooFewWorkersException();
+                }
+
+                // A worker must be specified at this point. This would be set 
up by the caller.
+                Preconditions.checkNotNull(workerId);
+
+                // [We can get stricter, and require that every drain op be 
followed up with a cleanup of the
+                // corresponding worker before any other drain op, so that the 
drainOpStatusMap should be empty
+                // at the next drain operation.]
+                if (drainOpStatusMap.containsKey(workerId)) {
+                    String warnString = "Worker " + workerId
+                            + " was not removed yet from SchedulerManager 
after previous drain op";
+                    log.warn(warnString);
+                    throw new WorkerNotRemovedAfterPriorDrainException();
+                }
+
+                if (!availableWorkers.contains(workerId)) {
+                    log.info("invokeDrain was called for a worker={} which is 
not currently active", workerId);
+                    throw new UnknownWorkerException();
+                }
+
+                currentDrainFuture = drain(workerId);
+                return currentDrainFuture;
+            } finally {
+                drainInProgressFlag.set(false);
+            }
+        } else {
+            throw new DrainInProgressException();
+        }
+    }
+
+    public LongRunningProcessStatus getDrainStatus(String workerId) {
         long startTime = System.nanoTime();
+        String errString;
+        LongRunningProcessStatus retVal = new LongRunningProcessStatus();
+        try {
+            val workerStatus = drainOpStatusMap.get(workerId);
+            if (workerStatus == null) {
+                errString = "Worker " + workerId + " not found in drain 
records";
+                retVal = LongRunningProcessStatus.forError(errString);
+            } else {
+                switch (workerStatus) {
+                    default:
+                        errString = "getDrainStatus: Unexpected status " + 
workerStatus + " found for worker " + workerId;
+                        retVal = LongRunningProcessStatus.forError(errString);
+                        break;
+                    case DrainCompleted:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                        break;
+                    case DrainInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+                        break;
+                    case DrainNotInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                        break;
+                }
+            }
+        } finally {
+            log.info("Get drain status for worker {} - execution time: {} sec; 
returning status={}, error={}",
+                    workerId, (System.nanoTime() - startTime) / Math.pow(10, 
9), retVal.status, retVal.lastError);
+            return retVal;
+        }
+    }
+
+    @VisibleForTesting
+    void clearDrainOpsStatus() {
+        drainOpStatusMap.clear();
+        log.warn("Cleared drain op status map");
+    }
+
+    @VisibleForTesting
+    void setDrainOpsStatus(final String workerId, final DrainOpStatus dStatus) 
{
+        drainOpStatusMap.put(workerId, dStatus);
+        log.warn("setDrainOpsStatus: updated drain status of worker {} to {}", 
workerId, dStatus);
+    }
 
+    @VisibleForTesting
+    ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
+        val retVal = new ConcurrentHashMap<String, 
DrainOpStatus>(drainOpStatusMap);
+        return retVal;
+    }
+
+    private synchronized int getCurrentAvailableNumWorkers() {
+        return getCurrentAvailableWorkers().size();
+    }
+
+    private synchronized Set <String> getCurrentAvailableWorkers() {
         Set<String> currentMembership = 
membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> 
workerInfo.getWorkerId()).collect(Collectors.toSet());
+        val unavailableWorkers = new ArrayList<String>();
+        // iterate the set, instead of the concurrent hashmap
+        for (String worker : currentMembership) {

Review comment:
       We don't need two loops to remove.  You can use an iterator to remove 
with iterating the set.
   
   https://mkyong.com/java/java-how-to-remove-items-from-a-list-while-iterating/

##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -215,28 +239,147 @@ public synchronized void initialize(Producer<byte[]> 
exclusiveProducer) {
     }
 
     public Future<?> rebalanceIfNotInprogress() {
-        if (rebalanceInProgess.compareAndSet(false, true)) {
+        if (rebalanceInProgress.compareAndSet(false, true)) {
+            val numWorkers = getCurrentAvailableNumWorkers();
+            if (numWorkers <= 1) {
+                rebalanceInProgress.set(false);
+                throw new TooFewWorkersException();
+            }
             currentRebalanceFuture = rebalance();
             return currentRebalanceFuture;
         } else {
             throw new RebalanceInProgressException();
         }
     }
 
-    @VisibleForTesting
-    void invokeScheduler() {
+    private Future<?> drain(String workerId) {
+        return scheduleInternal(() -> {
+            workerStatsManager.drainTotalExecTimeStart();
+            currentPostDrainAssignments = invokeDrain(workerId);
+            workerStatsManager.drainTotalExecTimeEnd();
+        }, "Encountered error when invoking drain");
+    }
+
+    public Future<?> drainIfNotInProgress(String workerId) {
+        if (drainInProgressFlag.compareAndSet(false, true)) {
+            try {
+                val availableWorkers = getCurrentAvailableWorkers();
+                if (availableWorkers.size() <= 1) {
+                    throw new TooFewWorkersException();
+                }
+
+                // A worker must be specified at this point. This would be set 
up by the caller.
+                Preconditions.checkNotNull(workerId);
+
+                // [We can get stricter, and require that every drain op be 
followed up with a cleanup of the
+                // corresponding worker before any other drain op, so that the 
drainOpStatusMap should be empty
+                // at the next drain operation.]
+                if (drainOpStatusMap.containsKey(workerId)) {
+                    String warnString = "Worker " + workerId
+                            + " was not removed yet from SchedulerManager 
after previous drain op";
+                    log.warn(warnString);
+                    throw new WorkerNotRemovedAfterPriorDrainException();
+                }
+
+                if (!availableWorkers.contains(workerId)) {
+                    log.info("invokeDrain was called for a worker={} which is 
not currently active", workerId);
+                    throw new UnknownWorkerException();
+                }
+
+                currentDrainFuture = drain(workerId);
+                return currentDrainFuture;
+            } finally {
+                drainInProgressFlag.set(false);
+            }
+        } else {
+            throw new DrainInProgressException();
+        }
+    }
+
+    public LongRunningProcessStatus getDrainStatus(String workerId) {
         long startTime = System.nanoTime();
+        String errString;
+        LongRunningProcessStatus retVal = new LongRunningProcessStatus();
+        try {
+            val workerStatus = drainOpStatusMap.get(workerId);
+            if (workerStatus == null) {
+                errString = "Worker " + workerId + " not found in drain 
records";
+                retVal = LongRunningProcessStatus.forError(errString);
+            } else {
+                switch (workerStatus) {
+                    default:
+                        errString = "getDrainStatus: Unexpected status " + 
workerStatus + " found for worker " + workerId;
+                        retVal = LongRunningProcessStatus.forError(errString);
+                        break;
+                    case DrainCompleted:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                        break;
+                    case DrainInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+                        break;
+                    case DrainNotInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                        break;
+                }
+            }
+        } finally {
+            log.info("Get drain status for worker {} - execution time: {} sec; 
returning status={}, error={}",
+                    workerId, (System.nanoTime() - startTime) / Math.pow(10, 
9), retVal.status, retVal.lastError);
+            return retVal;
+        }
+    }
+
+    @VisibleForTesting

Review comment:
       Can you add a comment to "clearDrainOpsStatus", "setDrainOpsStatus", and 
"getDrainOpsStatusMap" that these methods are only used for testing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to