jerrypeng commented on a change in pull request #12178:
URL: https://github.com/apache/pulsar/pull/12178#discussion_r716939382



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -215,28 +239,147 @@ public synchronized void initialize(Producer<byte[]> 
exclusiveProducer) {
     }
 
     public Future<?> rebalanceIfNotInprogress() {
-        if (rebalanceInProgess.compareAndSet(false, true)) {
+        if (rebalanceInProgress.compareAndSet(false, true)) {
+            val numWorkers = getCurrentAvailableNumWorkers();
+            if (numWorkers <= 1) {
+                rebalanceInProgress.set(false);
+                throw new TooFewWorkersException();
+            }
             currentRebalanceFuture = rebalance();
             return currentRebalanceFuture;
         } else {
             throw new RebalanceInProgressException();
         }
     }
 
-    @VisibleForTesting
-    void invokeScheduler() {
+    private Future<?> drain(String workerId) {
+        return scheduleInternal(() -> {
+            workerStatsManager.drainTotalExecTimeStart();
+            currentPostDrainAssignments = invokeDrain(workerId);
+            workerStatsManager.drainTotalExecTimeEnd();
+        }, "Encountered error when invoking drain");
+    }
+
+    public Future<?> drainIfNotInProgress(String workerId) {
+        if (drainInProgressFlag.compareAndSet(false, true)) {
+            try {
+                val availableWorkers = getCurrentAvailableWorkers();
+                if (availableWorkers.size() <= 1) {
+                    throw new TooFewWorkersException();
+                }
+
+                // A worker must be specified at this point. This would be set 
up by the caller.
+                Preconditions.checkNotNull(workerId);
+
+                // [We can get stricter, and require that every drain op be 
followed up with a cleanup of the
+                // corresponding worker before any other drain op, so that the 
drainOpStatusMap should be empty
+                // at the next drain operation.]
+                if (drainOpStatusMap.containsKey(workerId)) {
+                    String warnString = "Worker " + workerId
+                            + " was not removed yet from SchedulerManager 
after previous drain op";
+                    log.warn(warnString);
+                    throw new WorkerNotRemovedAfterPriorDrainException();
+                }
+
+                if (!availableWorkers.contains(workerId)) {
+                    log.info("invokeDrain was called for a worker={} which is 
not currently active", workerId);
+                    throw new UnknownWorkerException();
+                }
+
+                currentDrainFuture = drain(workerId);
+                return currentDrainFuture;
+            } finally {
+                drainInProgressFlag.set(false);
+            }
+        } else {
+            throw new DrainInProgressException();
+        }
+    }
+
+    public LongRunningProcessStatus getDrainStatus(String workerId) {
         long startTime = System.nanoTime();
+        String errString;
+        LongRunningProcessStatus retVal = new LongRunningProcessStatus();
+        try {
+            val workerStatus = drainOpStatusMap.get(workerId);
+            if (workerStatus == null) {
+                errString = "Worker " + workerId + " not found in drain 
records";
+                retVal = LongRunningProcessStatus.forError(errString);
+            } else {
+                switch (workerStatus) {
+                    default:
+                        errString = "getDrainStatus: Unexpected status " + 
workerStatus + " found for worker " + workerId;
+                        retVal = LongRunningProcessStatus.forError(errString);
+                        break;
+                    case DrainCompleted:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                        break;
+                    case DrainInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+                        break;
+                    case DrainNotInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                        break;
+                }
+            }
+        } finally {
+            log.info("Get drain status for worker {} - execution time: {} sec; 
returning status={}, error={}",
+                    workerId, (System.nanoTime() - startTime) / Math.pow(10, 
9), retVal.status, retVal.lastError);
+            return retVal;
+        }
+    }
+
+    @VisibleForTesting

Review comment:
       Can you add a comment to "clearDrainOpsStatus", "setDrainOpsStatus", and 
"getDrainOpsStatusMap" that these methods are only used for testing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to