maytasm commented on a change in pull request #9610: Fix NPE in
RemoteTaskRunner event handler causes JVM shutdown
URL: https://github.com/apache/druid/pull/9610#discussion_r404595527
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
##########
@@ -969,116 +970,129 @@ private boolean cancelWorkerCleanup(String workerHost)
);
// Add status listener to the watcher for status changes
- zkWorker.addListener(
- (client, event) -> {
- final String taskId;
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
- synchronized (statusLock) {
- try {
- switch (event.getType()) {
- case CHILD_ADDED:
- case CHILD_UPDATED:
- taskId =
ZKPaths.getNodeFromPath(event.getData().getPath());
- final TaskAnnouncement announcement = jsonMapper.readValue(
- event.getData().getData(), TaskAnnouncement.class
- );
-
- log.info(
- "Worker[%s] wrote %s status for task [%s] on [%s]",
- zkWorker.getWorker().getHost(),
- announcement.getTaskStatus().getStatusCode(),
- taskId,
- announcement.getTaskLocation()
- );
-
- // Synchronizing state with ZK
- statusLock.notifyAll();
-
- final RemoteTaskRunnerWorkItem tmp;
- if ((tmp = runningTasks.get(taskId)) != null) {
- taskRunnerWorkItem = tmp;
- } else {
- final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem =
new RemoteTaskRunnerWorkItem(
- taskId,
- announcement.getTaskType(),
- zkWorker.getWorker(),
- TaskLocation.unknown(),
- announcement.getTaskDataSource()
- );
- final RemoteTaskRunnerWorkItem existingItem =
runningTasks.putIfAbsent(
- taskId,
- newTaskRunnerWorkItem
- );
- if (existingItem == null) {
- log.warn(
- "Worker[%s] announced a status for a task I didn't
know about, adding to runningTasks: %s",
- zkWorker.getWorker().getHost(),
- taskId
- );
- taskRunnerWorkItem = newTaskRunnerWorkItem;
- } else {
- taskRunnerWorkItem = existingItem;
- }
- }
-
- if
(!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
-
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
- TaskRunnerUtils.notifyLocationChanged(listeners, taskId,
announcement.getTaskLocation());
- }
+ zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
+ zkWorker.start();
+ return retVal;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- if (announcement.getTaskStatus().isComplete()) {
- taskComplete(taskRunnerWorkItem, zkWorker,
announcement.getTaskStatus());
- runPendingTasks();
- }
- break;
- case CHILD_REMOVED:
- taskId =
ZKPaths.getNodeFromPath(event.getData().getPath());
- taskRunnerWorkItem = runningTasks.remove(taskId);
- if (taskRunnerWorkItem != null) {
- log.info("Task[%s] just disappeared!", taskId);
- taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
- TaskRunnerUtils.notifyStatusChanged(listeners, taskId,
TaskStatus.failure(taskId));
- } else {
- log.info("Task[%s] went bye bye.", taskId);
- }
- break;
- case INITIALIZED:
- if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) ==
null) {
- retVal.set(zkWorker);
- } else {
- final String message = StringUtils.format(
- "WTF?! Tried to add already-existing worker[%s]",
- worker.getHost()
- );
- log.makeAlert(message)
- .addData("workerHost", worker.getHost())
- .addData("workerIp", worker.getIp())
- .emit();
- retVal.setException(new IllegalStateException(message));
- }
- runPendingTasks();
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
+ @VisibleForTesting
+ PathChildrenCacheListener getStatusListener(final Worker worker, final
ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal)
+ {
+ return (client, event) -> {
+ final String taskId;
+ final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+ synchronized (statusLock) {
+ try {
+ switch (event.getType()) { // lgtm
[java/dereferenced-value-may-be-null]
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); //
lgtm [java/dereferenced-value-may-be-null]
+ final TaskAnnouncement announcement = jsonMapper.readValue(
+ event.getData().getData(), TaskAnnouncement.class // lgtm
[java/dereferenced-value-may-be-null]
+ );
+
+ log.info(
+ "Worker[%s] wrote %s status for task [%s] on [%s]",
+ zkWorker.getWorker().getHost(),
+ announcement.getTaskStatus().getStatusCode(),
+ taskId,
+ announcement.getTaskLocation()
+ );
+
+ // Synchronizing state with ZK
+ statusLock.notifyAll();
+
+ final RemoteTaskRunnerWorkItem tmp;
+ if ((tmp = runningTasks.get(taskId)) != null) {
+ taskRunnerWorkItem = tmp;
+ } else {
+ final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new
RemoteTaskRunnerWorkItem(
+ taskId,
+ announcement.getTaskType(),
+ zkWorker.getWorker(),
+ TaskLocation.unknown(),
+ announcement.getTaskDataSource()
+ );
+ final RemoteTaskRunnerWorkItem existingItem =
runningTasks.putIfAbsent(
+ taskId,
+ newTaskRunnerWorkItem
+ );
+ if (existingItem == null) {
+ log.warn(
+ "Worker[%s] announced a status for a task I didn't know
about, adding to runningTasks: %s",
+ zkWorker.getWorker().getHost(),
+ taskId
+ );
+ taskRunnerWorkItem = newTaskRunnerWorkItem;
+ } else {
+ taskRunnerWorkItem = existingItem;
}
}
- catch (Exception e) {
- log.makeAlert(e, "Failed to handle new worker status")
- .addData("worker", zkWorker.getWorker().getHost())
- .addData("znode", event.getData().getPath())
+
+ if
(!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+ taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+ TaskRunnerUtils.notifyLocationChanged(listeners, taskId,
announcement.getTaskLocation());
+ }
+
+ if (announcement.getTaskStatus().isComplete()) {
+ taskComplete(taskRunnerWorkItem, zkWorker,
announcement.getTaskStatus());
+ runPendingTasks();
+ }
+ break;
+ case CHILD_REMOVED:
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); //
lgtm [java/dereferenced-value-may-be-null]
+ taskRunnerWorkItem = runningTasks.remove(taskId);
+ if (taskRunnerWorkItem != null) {
+ log.info("Task[%s] just disappeared!", taskId);
+ taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskId,
TaskStatus.failure(taskId));
+ } else {
+ log.info("Task[%s] went bye bye.", taskId);
+ }
+ break;
+ case INITIALIZED:
+ if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+ retVal.set(zkWorker);
+ } else {
+ final String message = StringUtils.format(
+ "This should not happen...tried to add already-existing
worker[%s]",
+ worker.getHost()
+ );
+ log.makeAlert(message)
+ .addData("workerHost", worker.getHost())
+ .addData("workerIp", worker.getIp())
.emit();
+ retVal.setException(new IllegalStateException(message));
}
+ runPendingTasks();
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_RECONNECTED:
+ case CONNECTION_LOST:
+ // do nothing
+ }
+ }
+ catch (Exception e) {
+ String znode = null;
+ String eventType = null;
+ if (event != null) {
Review comment:
event cannot be null. I added the check just in case upstream and/or
dependency library changes. This is to prevent the JVM from shutting down. I
removed it for now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]