jerrypeng commented on a change in pull request #12178:
URL: https://github.com/apache/pulsar/pull/12178#discussion_r716906045
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
##########
@@ -156,6 +160,65 @@ public void rebalance() {
workers().rebalance(uri.getRequestUri(), clientAppId());
}
+ @PUT
+ @ApiOperation(
+ value = "Drains the specified worker, i.e., moves its
work-assignments to other workers"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 409, message = "Drain already in progress"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/leader/{drain}")
+ public void drainAtLeader(@PathParam("drain") String workerId) {
+ workers().drain(uri.getRequestUri(), clientAppId(), true);
+ }
+
+ @PUT
+ @ApiOperation(
+ value = "Drains this worker, i.e., moves its work-assignments to
other workers"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 409, message = "Drain already in progress"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/drain")
+ public void drain() {
+ workers().drain(uri.getRequestUri(), clientAppId(), false);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Get the status of any ongoing drain operation at the
specified worker",
+ response = LongRunningProcessStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/leader/{drain}")
Review comment:
I think a better name for the URL parameter is "workerId".
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
##########
@@ -156,6 +160,65 @@ public void rebalance() {
workers().rebalance(uri.getRequestUri(), clientAppId());
}
+ @PUT
+ @ApiOperation(
+ value = "Drains the specified worker, i.e., moves its
work-assignments to other workers"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 409, message = "Drain already in progress"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/leader/{drain}")
+ public void drainAtLeader(@PathParam("drain") String workerId) {
+ workers().drain(uri.getRequestUri(), clientAppId(), true);
+ }
+
+ @PUT
+ @ApiOperation(
+ value = "Drains this worker, i.e., moves its work-assignments to
other workers"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 409, message = "Drain already in progress"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/drain")
+ public void drain() {
+ workers().drain(uri.getRequestUri(), clientAppId(), false);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Get the status of any ongoing drain operation at the
specified worker",
+ response = LongRunningProcessStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/leader/{drain}")
+ public LongRunningProcessStatus getDrainStatus(@PathParam("drain") String
workerId) {
+ return workers().getDrainStatus(uri.getRequestUri(), clientAppId(),
true);
Review comment:
Why are you passing the URI into "getDrainStatus()" and later trying to
parse the workerId from the URI instead of of just passing in the workerId
directly to "getDrainStatus()"?
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
##########
@@ -156,6 +160,65 @@ public void rebalance() {
workers().rebalance(uri.getRequestUri(), clientAppId());
}
+ @PUT
+ @ApiOperation(
+ value = "Drains the specified worker, i.e., moves its
work-assignments to other workers"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 409, message = "Drain already in progress"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/leader/{drain}")
+ public void drainAtLeader(@PathParam("drain") String workerId) {
Review comment:
I think a better name for the URL parameter is "workerId".
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
##########
@@ -127,6 +129,65 @@ public void rebalance() {
workers().rebalance(uri.getRequestUri(), clientAppId());
}
+ @PUT
+ @ApiOperation(
+ value = "Drains the specified worker, i.e., moves its
work-assignments to other workers"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 409, message = "Drain already in progress"),
+ @ApiResponse(code = 503, message = "Worker service is not ready")
+ })
+ @Path("/leader/{drain}")
Review comment:
Please refer to my comments in WorkerApiV2Resource.java
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -415,6 +633,34 @@ private void compactAssignmentTopic() {
}
}
+ protected int updateWorkerDrainMap() {
Review comment:
Since this gets run periodically, lets make sure method cannot somehow
get invoked concurrently.
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -415,6 +633,34 @@ private void compactAssignmentTopic() {
}
}
+ protected int updateWorkerDrainMap() {
+ long startTime = System.nanoTime();
+ int numRemovedWorkerIds = 0;
+
+ if (drainOpStatusMap.size() > 0) {
Review comment:
I know drainOpStatusMap is a concurrent map but we are reading and
writing from different different threads. This makes me nervous.
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -415,6 +633,34 @@ private void compactAssignmentTopic() {
}
}
+ protected int updateWorkerDrainMap() {
+ long startTime = System.nanoTime();
+ int numRemovedWorkerIds = 0;
+
+ if (drainOpStatusMap.size() > 0) {
Review comment:
I know drainOpStatusMap is a concurrent map but we are reading and
writing from different different threads. This makes me nervous. Is there a
reason we should synchronize this with drain operation?
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -215,28 +239,147 @@ public synchronized void initialize(Producer<byte[]>
exclusiveProducer) {
}
public Future<?> rebalanceIfNotInprogress() {
- if (rebalanceInProgess.compareAndSet(false, true)) {
+ if (rebalanceInProgress.compareAndSet(false, true)) {
+ val numWorkers = getCurrentAvailableNumWorkers();
+ if (numWorkers <= 1) {
+ rebalanceInProgress.set(false);
+ throw new TooFewWorkersException();
+ }
currentRebalanceFuture = rebalance();
return currentRebalanceFuture;
} else {
throw new RebalanceInProgressException();
}
}
- @VisibleForTesting
- void invokeScheduler() {
+ private Future<?> drain(String workerId) {
+ return scheduleInternal(() -> {
+ workerStatsManager.drainTotalExecTimeStart();
+ currentPostDrainAssignments = invokeDrain(workerId);
+ workerStatsManager.drainTotalExecTimeEnd();
+ }, "Encountered error when invoking drain");
+ }
+
+ public Future<?> drainIfNotInProgress(String workerId) {
+ if (drainInProgressFlag.compareAndSet(false, true)) {
+ try {
+ val availableWorkers = getCurrentAvailableWorkers();
+ if (availableWorkers.size() <= 1) {
+ throw new TooFewWorkersException();
+ }
+
+ // A worker must be specified at this point. This would be set
up by the caller.
+ Preconditions.checkNotNull(workerId);
+
+ // [We can get stricter, and require that every drain op be
followed up with a cleanup of the
+ // corresponding worker before any other drain op, so that the
drainOpStatusMap should be empty
+ // at the next drain operation.]
+ if (drainOpStatusMap.containsKey(workerId)) {
+ String warnString = "Worker " + workerId
+ + " was not removed yet from SchedulerManager
after previous drain op";
+ log.warn(warnString);
+ throw new WorkerNotRemovedAfterPriorDrainException();
+ }
+
+ if (!availableWorkers.contains(workerId)) {
+ log.info("invokeDrain was called for a worker={} which is
not currently active", workerId);
+ throw new UnknownWorkerException();
+ }
+
+ currentDrainFuture = drain(workerId);
+ return currentDrainFuture;
+ } finally {
+ drainInProgressFlag.set(false);
+ }
+ } else {
+ throw new DrainInProgressException();
+ }
+ }
+
+ public LongRunningProcessStatus getDrainStatus(String workerId) {
long startTime = System.nanoTime();
+ String errString;
+ LongRunningProcessStatus retVal = new LongRunningProcessStatus();
+ try {
+ val workerStatus = drainOpStatusMap.get(workerId);
+ if (workerStatus == null) {
+ errString = "Worker " + workerId + " not found in drain
records";
+ retVal = LongRunningProcessStatus.forError(errString);
+ } else {
+ switch (workerStatus) {
+ default:
+ errString = "getDrainStatus: Unexpected status " +
workerStatus + " found for worker " + workerId;
+ retVal = LongRunningProcessStatus.forError(errString);
+ break;
+ case DrainCompleted:
+ retVal =
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+ break;
+ case DrainInProgress:
+ retVal =
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+ break;
+ case DrainNotInProgress:
+ retVal =
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+ break;
+ }
+ }
+ } finally {
+ log.info("Get drain status for worker {} - execution time: {} sec;
returning status={}, error={}",
+ workerId, (System.nanoTime() - startTime) / Math.pow(10,
9), retVal.status, retVal.lastError);
+ return retVal;
+ }
+ }
+
+ @VisibleForTesting
+ void clearDrainOpsStatus() {
+ drainOpStatusMap.clear();
+ log.warn("Cleared drain op status map");
+ }
+
+ @VisibleForTesting
+ void setDrainOpsStatus(final String workerId, final DrainOpStatus dStatus)
{
+ drainOpStatusMap.put(workerId, dStatus);
+ log.warn("setDrainOpsStatus: updated drain status of worker {} to {}",
workerId, dStatus);
+ }
+ @VisibleForTesting
+ ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
+ val retVal = new ConcurrentHashMap<String,
DrainOpStatus>(drainOpStatusMap);
+ return retVal;
+ }
+
+ private synchronized int getCurrentAvailableNumWorkers() {
+ return getCurrentAvailableWorkers().size();
+ }
+
+ private synchronized Set <String> getCurrentAvailableWorkers() {
Set<String> currentMembership =
membershipManager.getCurrentMembership()
.stream().map(workerInfo ->
workerInfo.getWorkerId()).collect(Collectors.toSet());
+ val unavailableWorkers = new ArrayList<String>();
+ // iterate the set, instead of the concurrent hashmap
+ for (String worker : currentMembership) {
Review comment:
We don't need two loops to remove. You can use an iterator to remove
with iterating the set.
https://mkyong.com/java/java-how-to-remove-items-from-a-list-while-iterating/
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
##########
@@ -215,28 +239,147 @@ public synchronized void initialize(Producer<byte[]>
exclusiveProducer) {
}
public Future<?> rebalanceIfNotInprogress() {
- if (rebalanceInProgess.compareAndSet(false, true)) {
+ if (rebalanceInProgress.compareAndSet(false, true)) {
+ val numWorkers = getCurrentAvailableNumWorkers();
+ if (numWorkers <= 1) {
+ rebalanceInProgress.set(false);
+ throw new TooFewWorkersException();
+ }
currentRebalanceFuture = rebalance();
return currentRebalanceFuture;
} else {
throw new RebalanceInProgressException();
}
}
- @VisibleForTesting
- void invokeScheduler() {
+ private Future<?> drain(String workerId) {
+ return scheduleInternal(() -> {
+ workerStatsManager.drainTotalExecTimeStart();
+ currentPostDrainAssignments = invokeDrain(workerId);
+ workerStatsManager.drainTotalExecTimeEnd();
+ }, "Encountered error when invoking drain");
+ }
+
+ public Future<?> drainIfNotInProgress(String workerId) {
+ if (drainInProgressFlag.compareAndSet(false, true)) {
+ try {
+ val availableWorkers = getCurrentAvailableWorkers();
+ if (availableWorkers.size() <= 1) {
+ throw new TooFewWorkersException();
+ }
+
+ // A worker must be specified at this point. This would be set
up by the caller.
+ Preconditions.checkNotNull(workerId);
+
+ // [We can get stricter, and require that every drain op be
followed up with a cleanup of the
+ // corresponding worker before any other drain op, so that the
drainOpStatusMap should be empty
+ // at the next drain operation.]
+ if (drainOpStatusMap.containsKey(workerId)) {
+ String warnString = "Worker " + workerId
+ + " was not removed yet from SchedulerManager
after previous drain op";
+ log.warn(warnString);
+ throw new WorkerNotRemovedAfterPriorDrainException();
+ }
+
+ if (!availableWorkers.contains(workerId)) {
+ log.info("invokeDrain was called for a worker={} which is
not currently active", workerId);
+ throw new UnknownWorkerException();
+ }
+
+ currentDrainFuture = drain(workerId);
+ return currentDrainFuture;
+ } finally {
+ drainInProgressFlag.set(false);
+ }
+ } else {
+ throw new DrainInProgressException();
+ }
+ }
+
+ public LongRunningProcessStatus getDrainStatus(String workerId) {
long startTime = System.nanoTime();
+ String errString;
+ LongRunningProcessStatus retVal = new LongRunningProcessStatus();
+ try {
+ val workerStatus = drainOpStatusMap.get(workerId);
+ if (workerStatus == null) {
+ errString = "Worker " + workerId + " not found in drain
records";
+ retVal = LongRunningProcessStatus.forError(errString);
+ } else {
+ switch (workerStatus) {
+ default:
+ errString = "getDrainStatus: Unexpected status " +
workerStatus + " found for worker " + workerId;
+ retVal = LongRunningProcessStatus.forError(errString);
+ break;
+ case DrainCompleted:
+ retVal =
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+ break;
+ case DrainInProgress:
+ retVal =
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+ break;
+ case DrainNotInProgress:
+ retVal =
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+ break;
+ }
+ }
+ } finally {
+ log.info("Get drain status for worker {} - execution time: {} sec;
returning status={}, error={}",
+ workerId, (System.nanoTime() - startTime) / Math.pow(10,
9), retVal.status, retVal.lastError);
+ return retVal;
+ }
+ }
+
+ @VisibleForTesting
Review comment:
Can you add a comment to "clearDrainOpsStatus", "setDrainOpsStatus", and
"getDrainOpsStatusMap" that these methods are only used for testing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]