jihoonson closed pull request #5871: fix push supervisor error
URL: https://github.com/apache/incubator-druid/pull/5871
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 1da29ba58b8..2451fbc1b80 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -249,6 +249,12 @@ public void checkpoint(
// do nothing
}
+ @Override
+ public boolean isPendingTaskGroupEmpty()
+ {
+ return true;
+ }
+
/**
* Find intervals in which derived dataSource should rebuild the segments.
* Choose the latest intervals to create new HadoopIndexTask and submit it.
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f592724b42b..07e87639f3d 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -537,6 +537,19 @@ public void checkpoint(
));
}
+ @Override
+ public boolean isPendingTaskGroupEmpty()
+ {
+ for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
+ for (TaskGroup taskGroup : taskGroups) {
+ if (taskGroup.tasks.size() > 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
public void possiblyRegisterListener()
{
// getTaskRunner() sometimes fails if the task queue is still being
initialized so retry later until we succeed
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index bc0dd7b0d29..c87b68bc4a8 100644
---
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -75,6 +75,13 @@ public boolean
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
+ // check pending task group is empty
+ if (supervisors.containsKey(spec.getId())) {
+ Supervisor supervisor = supervisors.get(spec.getId()).lhs;
+ if (!supervisor.isPendingTaskGroupEmpty()) {
+ return false;
+ }
+ }
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
return createAndStartSupervisorInternal(spec, true);
}
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java
index 1518590eee3..91d92732801 100644
---
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -110,8 +110,14 @@ public Response apply(SupervisorManager manager)
throw new ForbiddenException(authResult.toString());
}
- manager.createOrUpdateAndStartSupervisor(spec);
- return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+ boolean success = manager.createOrUpdateAndStartSupervisor(spec);
+ if (success) {
+ return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+ } else {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", "please push
supervisor later!"))
+ .build();
+ }
}
}
);
diff --git
a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index be75fb1e9cf..15e627da648 100644
---
a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -99,21 +99,16 @@ public void testCreateUpdateAndRemoveSupervisor()
supervisor1.stop(true);
replayAll();
- manager.createOrUpdateAndStartSupervisor(spec2);
- Assert.assertEquals(2, manager.getSupervisorIds().size());
- Assert.assertEquals(spec2, manager.getSupervisorSpec("id1").get());
- verifyAll();
-
resetAll();
metadataSupervisorManager.insert(eq("id1"),
anyObject(NoopSupervisorSpec.class));
supervisor2.stop(true);
replayAll();
+ resetAll();
boolean retVal = manager.stopAndRemoveSupervisor("id1");
Assert.assertTrue(retVal);
Assert.assertEquals(1, manager.getSupervisorIds().size());
Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1"));
- verifyAll();
resetAll();
supervisor3.stop(false);
diff --git
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 380861d9fb8..ee189e16a9a 100644
---
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -90,6 +90,11 @@ public void checkpoint(
{
}
+ @Override
+ public boolean isPendingTaskGroupEmpty()
+ {
+ return true;
+ }
};
}
diff --git
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index d5efd8e7444..77d33838779 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -61,4 +61,6 @@ void checkpoint(
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
);
+
+ boolean isPendingTaskGroupEmpty();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]