jerrypeng commented on a change in pull request #12178:
URL: https://github.com/apache/pulsar/pull/12178#discussion_r716938186



##########
File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -215,28 +239,147 @@ public synchronized void initialize(Producer<byte[]> 
exclusiveProducer) {
     }
 
     public Future<?> rebalanceIfNotInprogress() {
-        if (rebalanceInProgess.compareAndSet(false, true)) {
+        if (rebalanceInProgress.compareAndSet(false, true)) {
+            val numWorkers = getCurrentAvailableNumWorkers();
+            if (numWorkers <= 1) {
+                rebalanceInProgress.set(false);
+                throw new TooFewWorkersException();
+            }
             currentRebalanceFuture = rebalance();
             return currentRebalanceFuture;
         } else {
             throw new RebalanceInProgressException();
         }
     }
 
-    @VisibleForTesting
-    void invokeScheduler() {
+    private Future<?> drain(String workerId) {
+        return scheduleInternal(() -> {
+            workerStatsManager.drainTotalExecTimeStart();
+            currentPostDrainAssignments = invokeDrain(workerId);
+            workerStatsManager.drainTotalExecTimeEnd();
+        }, "Encountered error when invoking drain");
+    }
+
+    public Future<?> drainIfNotInProgress(String workerId) {
+        if (drainInProgressFlag.compareAndSet(false, true)) {
+            try {
+                val availableWorkers = getCurrentAvailableWorkers();
+                if (availableWorkers.size() <= 1) {
+                    throw new TooFewWorkersException();
+                }
+
+                // A worker must be specified at this point. This would be set 
up by the caller.
+                Preconditions.checkNotNull(workerId);
+
+                // [We can get stricter, and require that every drain op be 
followed up with a cleanup of the
+                // corresponding worker before any other drain op, so that the 
drainOpStatusMap should be empty
+                // at the next drain operation.]
+                if (drainOpStatusMap.containsKey(workerId)) {
+                    String warnString = "Worker " + workerId
+                            + " was not removed yet from SchedulerManager 
after previous drain op";
+                    log.warn(warnString);
+                    throw new WorkerNotRemovedAfterPriorDrainException();
+                }
+
+                if (!availableWorkers.contains(workerId)) {
+                    log.info("invokeDrain was called for a worker={} which is 
not currently active", workerId);
+                    throw new UnknownWorkerException();
+                }
+
+                currentDrainFuture = drain(workerId);
+                return currentDrainFuture;
+            } finally {
+                drainInProgressFlag.set(false);
+            }
+        } else {
+            throw new DrainInProgressException();
+        }
+    }
+
+    public LongRunningProcessStatus getDrainStatus(String workerId) {
         long startTime = System.nanoTime();
+        String errString;
+        LongRunningProcessStatus retVal = new LongRunningProcessStatus();
+        try {
+            val workerStatus = drainOpStatusMap.get(workerId);
+            if (workerStatus == null) {
+                errString = "Worker " + workerId + " not found in drain 
records";
+                retVal = LongRunningProcessStatus.forError(errString);
+            } else {
+                switch (workerStatus) {
+                    default:
+                        errString = "getDrainStatus: Unexpected status " + 
workerStatus + " found for worker " + workerId;
+                        retVal = LongRunningProcessStatus.forError(errString);
+                        break;
+                    case DrainCompleted:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                        break;
+                    case DrainInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+                        break;
+                    case DrainNotInProgress:
+                        retVal = 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                        break;
+                }
+            }
+        } finally {
+            log.info("Get drain status for worker {} - execution time: {} sec; 
returning status={}, error={}",
+                    workerId, (System.nanoTime() - startTime) / Math.pow(10, 
9), retVal.status, retVal.lastError);
+            return retVal;
+        }
+    }
+
+    @VisibleForTesting
+    void clearDrainOpsStatus() {
+        drainOpStatusMap.clear();
+        log.warn("Cleared drain op status map");
+    }
+
+    @VisibleForTesting
+    void setDrainOpsStatus(final String workerId, final DrainOpStatus dStatus) 
{
+        drainOpStatusMap.put(workerId, dStatus);
+        log.warn("setDrainOpsStatus: updated drain status of worker {} to {}", 
workerId, dStatus);
+    }
 
+    @VisibleForTesting
+    ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
+        val retVal = new ConcurrentHashMap<String, 
DrainOpStatus>(drainOpStatusMap);
+        return retVal;
+    }
+
+    private synchronized int getCurrentAvailableNumWorkers() {
+        return getCurrentAvailableWorkers().size();
+    }
+
+    private synchronized Set <String> getCurrentAvailableWorkers() {
         Set<String> currentMembership = 
membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> 
workerInfo.getWorkerId()).collect(Collectors.toSet());
+        val unavailableWorkers = new ArrayList<String>();
+        // iterate the set, instead of the concurrent hashmap
+        for (String worker : currentMembership) {

Review comment:
       We don't need two loops to remove.  You can use an iterator to remove 
with iterating the set.
   
   https://mkyong.com/java/java-how-to-remove-items-from-a-list-while-iterating/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to