This is an automated email from the ASF dual-hosted git repository. gian pushed a commit to branch 0.12.2 in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push: new f3aeccf [Backport] Fix missing task type in task payload API (#5941) f3aeccf is described below commit f3aeccf79a7427086e9084292ec9479a8c7fa7a2 Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Mon Jul 9 11:23:34 2018 -0700 [Backport] Fix missing task type in task payload API (#5941) * Fix missing task type in task payload API. (#5399) * Fix missing task type in task payload API. Apparently embedding a polymorphic object inside a Map<String, Object> is a bit too much for Jackson to serialize properly. Fix this by using wrapper classes. * Fix OverlordTest casts. * Remove import. * Remove unused imports. * Clarify comments. * fix test --- .../java/io/druid/indexing/common/TaskStatus.java | 21 ++++++ .../indexing/overlord/http/OverlordResource.java | 41 +++++++---- .../overlord/http/TaskPayloadResponse.java | 83 +++++++++++++++++++++ .../indexing/overlord/http/TaskStatusResponse.java | 84 ++++++++++++++++++++++ .../overlord/http/OverlordResourceTest.java | 66 ++++++++++++++++- .../druid/indexing/overlord/http/OverlordTest.java | 13 ++-- 6 files changed, 284 insertions(+), 24 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java index e353595..10ffacc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java @@ -153,4 +153,25 @@ public class TaskStatus .add("duration", duration) .toString(); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskStatus that = (TaskStatus) o; + return getDuration() == that.getDuration() && + java.util.Objects.equals(getId(), that.getId()) && + status == that.status; + } + + @Override + public int hashCode() + { + return java.util.Objects.hash(getId(), status, getDuration()); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 4fa507e..8e4be31 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -169,7 +169,12 @@ public class OverlordResource } catch (EntryExistsException e) { return Response.status(Response.Status.BAD_REQUEST) - .entity(ImmutableMap.of("error", StringUtils.format("Task[%s] already exists!", task.getId()))) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("Task[%s] already exists!", task.getId()) + ) + ) .build(); } } @@ -209,7 +214,16 @@ public class OverlordResource @ResourceFilters(TaskResourceFilter.class) public Response getTaskPayload(@PathParam("taskid") String taskid) { - return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid)); + final TaskPayloadResponse response = new TaskPayloadResponse( + taskid, + taskStorageQueryAdapter.getTask(taskid).orNull() + ); + + final Response.Status status = response.getPayload() == null + ? Response.Status.NOT_FOUND + : Response.Status.OK; + + return Response.status(status).entity(response).build(); } @GET @@ -218,7 +232,16 @@ public class OverlordResource @ResourceFilters(TaskResourceFilter.class) public Response getTaskStatus(@PathParam("taskid") String taskid) { - return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getStatus(taskid)); + final TaskStatusResponse response = new TaskStatusResponse( + taskid, + taskStorageQueryAdapter.getStatus(taskid).orNull() + ); + + final Response.Status status = response.getStatus() == null + ? Response.Status.NOT_FOUND + : Response.Status.OK; + + return Response.status(status).entity(response).build(); } @GET @@ -654,18 +677,6 @@ public class OverlordResource ); } - private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x) - { - final Map<String, Object> results = Maps.newHashMap(); - results.put("task", taskid); - if (x.isPresent()) { - results.put(objectType, x.get()); - return Response.status(Response.Status.OK).entity(results).build(); - } else { - return Response.status(Response.Status.NOT_FOUND).entity(results).build(); - } - } - private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f) { if (x.isPresent()) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java new file mode 100644 index 0000000..1e7b4e9 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java @@ -0,0 +1,83 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.indexing.common.task.Task; + +import java.util.Objects; + +public class TaskPayloadResponse +{ + private final String task; // Task ID, named "task" in the JSONification of this class. + private final Task payload; + + @JsonCreator + public TaskPayloadResponse( + @JsonProperty("task") final String task, + @JsonProperty("payload") final Task payload + ) + { + this.task = task; + this.payload = payload; + } + + @JsonProperty + public String getTask() + { + return task; + } + + @JsonProperty + public Task getPayload() + { + return payload; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TaskPayloadResponse that = (TaskPayloadResponse) o; + return Objects.equals(task, that.task) && + Objects.equals(payload, that.payload); + } + + @Override + public int hashCode() + { + return Objects.hash(task, payload); + } + + @Override + public String toString() + { + return "TaskPayloadResponse{" + + "task='" + task + '\'' + + ", payload=" + payload + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java new file mode 100644 index 0000000..b15aeb0 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.indexing.common.TaskStatus; + +import java.util.Objects; + +public class TaskStatusResponse +{ + private final String task; // Task ID, named "task" in the JSONification of this class. + private final TaskStatus status; + + @JsonCreator + public TaskStatusResponse( + @JsonProperty("task") final String task, + @JsonProperty("status") final TaskStatus status + ) + { + this.task = task; + this.status = status; + } + + @JsonProperty + public String getTask() + { + return task; + } + + @JsonProperty + public TaskStatus getStatus() + { + return status; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TaskStatusResponse that = (TaskStatusResponse) o; + return Objects.equals(task, that.task) && + Objects.equals(status, that.status); + } + + @Override + public int hashCode() + { + + return Objects.hash(task, status); + } + + @Override + public String toString() + { + return "TaskstatusResponse{" + + "task='" + task + '\'' + + ", status=" + status + + '}'; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 22b9e51..4264186 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -39,6 +39,7 @@ import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.java.util.common.DateTimes; +import io.druid.segment.TestHelper; import io.druid.server.security.Access; import io.druid.server.security.Action; import io.druid.server.security.AuthConfig; @@ -87,7 +88,8 @@ public class OverlordResourceTest Optional.of(taskRunner) ).anyTimes(); - AuthorizerMapper authMapper = new AuthorizerMapper(null) { + AuthorizerMapper authMapper = new AuthorizerMapper(null) + { @Override public Authorizer getAuthorizer(String name) { @@ -280,7 +282,12 @@ public class OverlordResourceTest EasyMock.expect(taskMaster.isLeader()).andReturn(true); EasyMock - .expect(indexerMetadataStorageAdapter.deletePendingSegments(EasyMock.eq("allow"), EasyMock.anyObject(Interval.class))) + .expect( + indexerMetadataStorageAdapter.deletePendingSegments( + EasyMock.eq("allow"), + EasyMock.anyObject(Interval.class) + ) + ) .andReturn(2); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); @@ -291,6 +298,61 @@ public class OverlordResourceTest Assert.assertEquals(2, response.get("numDeleted").intValue()); } + @Test + public void testGetTaskPayload() throws Exception + { + expectAuthorizationTokenCheck(); + final NoopTask task = NoopTask.create("mydatasource"); + EasyMock.expect(taskStorageQueryAdapter.getTask("mytask")) + .andReturn(Optional.of(task)); + + EasyMock.expect(taskStorageQueryAdapter.getTask("othertask")) + .andReturn(Optional.absent()); + + EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); + + final Response response1 = overlordResource.getTaskPayload("mytask"); + final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()), + TaskPayloadResponse.class + ); + Assert.assertEquals(new TaskPayloadResponse("mytask", task), taskPayloadResponse1); + + final Response response2 = overlordResource.getTaskPayload("othertask"); + final TaskPayloadResponse taskPayloadResponse2 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()), + TaskPayloadResponse.class + ); + Assert.assertEquals(new TaskPayloadResponse("othertask", null), taskPayloadResponse2); + } + + @Test + public void testGetTaskStatus() throws Exception + { + expectAuthorizationTokenCheck(); + EasyMock.expect(taskStorageQueryAdapter.getStatus("mytask")) + .andReturn(Optional.of(TaskStatus.success("mytask"))); + + EasyMock.expect(taskStorageQueryAdapter.getStatus("othertask")) + .andReturn(Optional.absent()); + + EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); + + final Response response1 = overlordResource.getTaskStatus("mytask"); + final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()), + TaskStatusResponse.class + ); + Assert.assertEquals(new TaskStatusResponse("mytask", TaskStatus.success("mytask")), taskStatusResponse1); + + final Response response2 = overlordResource.getTaskStatus("othertask"); + final TaskStatusResponse taskStatusResponse2 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()), + TaskStatusResponse.class + ); + Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2); + } + @After public void tearDown() { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 9d95c65..8bfdc3c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.discovery.DruidLeaderSelector; @@ -56,6 +54,8 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.server.DruidNode; import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -80,7 +80,6 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -237,7 +236,7 @@ public class OverlordTest // Task payload for task_0 should be present in taskStorage response = overlordResource.getTaskPayload(taskId_0); - Assert.assertEquals(task_0, ((Map) response.getEntity()).get("payload")); + Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload()); // Task not present in taskStorage - should fail response = overlordResource.getTaskPayload("whatever"); @@ -245,10 +244,10 @@ public class OverlordTest // Task status of the submitted task should be running response = overlordResource.getTaskStatus(taskId_0); - Assert.assertEquals(taskId_0, ((Map) response.getEntity()).get("task")); + Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask()); Assert.assertEquals( TaskStatus.running(taskId_0).getStatusCode(), - ((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode() + ((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode() ); // Simulate completion of task_0 @@ -296,7 +295,7 @@ public class OverlordTest { while (true) { Response response = overlordResource.getTaskStatus(taskId); - if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) { + if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) { break; } Thread.sleep(10); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org