Repository: samza Updated Branches: refs/heads/master d399d6f3c -> 26280cac2
http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java new file mode 100644 index 0000000..9194d5a --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import java.util.Collections; +import javax.ws.rs.core.Response; + +/** + * This is a helper class that holds the methods that are reusable + * across the different samza-rest resource endpoints. + */ +public class Responses { + + private Responses() { + } + + /** + * Constructs a consistent format for error responses. + * + * @param message the error message to report. + * @return the {@link Response} containing the error message. + */ + public static Response errorResponse(String message) { + return Response.serverError().entity(Collections.singletonMap("message", message)).build(); + } + + /** + * Constructs a bad request (HTTP 400) response. + * + * @param message the bad request message to report. + * @return the {@link Response} containing the message. + */ + public static Response badRequestResponse(String message) { + return Response.status(Response.Status.BAD_REQUEST).entity(Collections.singletonMap("message", message)).build(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java new file mode 100644 index 0000000..301c202 --- /dev/null +++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import com.google.common.base.Preconditions; +import javax.inject.Singleton; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.lang.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.rest.model.Task; +import org.apache.samza.rest.proxy.job.JobInstance; +import org.apache.samza.rest.proxy.task.TaskProxyFactory; +import org.apache.samza.rest.proxy.task.SamzaTaskProxy; +import org.apache.samza.rest.proxy.task.TaskProxy; +import org.apache.samza.rest.proxy.task.TaskResourceConfig; +import org.apache.samza.util.ClassLoaderHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The REST resource for tasks. Handles the requests that are at the tasks scope. + */ +@Singleton +@Path("/v1/jobs") +public class TasksResource { + + private static final Logger LOG = LoggerFactory.getLogger(TasksResource.class); + + private final TaskProxy taskProxy; + + /** + * Initializes a TaskResource with {@link TaskProxy} from the + * {@link TaskProxyFactory} class specified in the configuration. + * + * @param config the configuration containing the {@link TaskProxyFactory} class. + */ + public TasksResource(TaskResourceConfig config) { + String taskProxyFactory = config.getTaskProxyFactory(); + Preconditions.checkArgument(StringUtils.isNotEmpty(taskProxyFactory), + String.format("Missing config: %s", TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY)); + try { + TaskProxyFactory factory = ClassLoaderHelper.fromClassName(taskProxyFactory); + taskProxy = factory.getTaskProxy(config); + } catch (Exception e) { + LOG.error(String.format("Exception in building TasksResource with config: %s.", config), e); + throw new SamzaException(e); + } + } + + /** + * Gets the list of {@link Task} for the job instance specified by jobName and jobId. + * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME} + * @param jobId the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}. + * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response} + * contains a list of {@link Task}, where each task belongs to + * the samza job. {@link javax.ws.rs.core.Response.Status#BAD_REQUEST} is returned for invalid + * job instances. + */ + @GET + @Path("/{jobName}/{jobId}/tasks") + @Produces(MediaType.APPLICATION_JSON) + public Response getTasks( + @PathParam("jobName") final String jobName, + @PathParam("jobId") final String jobId) { + try { + return Response.ok(taskProxy.getTasks(new JobInstance(jobName, jobId))).build(); + } catch (IllegalArgumentException e) { + String message = String.format("Invalid arguments for getTasks. jobName: %s, jobId: %s.", jobName, jobId); + LOG.error(message, e); + return Responses.badRequestResponse(message); + } catch (Exception e) { + LOG.error(String.format("Error in getTasks with arguments jobName: %s, jobId: %s.", jobName, jobId), e); + return Responses.errorResponse(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java index 7db437b..2a051c4 100644 --- a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java @@ -18,8 +18,8 @@ */ package org.apache.samza.rest.resources; +import com.google.common.collect.ImmutableMap; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import javax.ws.rs.client.Entity; import javax.ws.rs.core.Application; @@ -31,6 +31,8 @@ import org.apache.samza.rest.SamzaRestConfig; import org.apache.samza.rest.model.Job; import org.apache.samza.rest.model.JobStatus; import org.apache.samza.rest.resources.mock.MockJobProxy; +import org.apache.samza.rest.resources.mock.MockJobProxyFactory; +import org.apache.samza.rest.resources.mock.MockResourceFactory; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -48,10 +50,11 @@ public class TestJobsResource extends JerseyTest { @Override protected Application configure() { - Map<String, String> map = new HashMap<>(); - map.put(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, "org.apache.samza.rest.resources.mock.MockJobProxyFactory"); - map.put(JobsResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH, "."); - SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map)); + Map<String, String> configMap = ImmutableMap.of(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, + MockJobProxyFactory.class.getName(), + SamzaRestConfig.CONFIG_REST_RESOURCE_FACTORIES, + MockResourceFactory.class.getName()); + SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap)); return new SamzaRestApplication(config); } http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java new file mode 100644 index 0000000..63a9958 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; +import org.apache.samza.config.MapConfig; +import org.apache.samza.rest.SamzaRestApplication; +import org.apache.samza.rest.SamzaRestConfig; +import org.apache.samza.rest.model.Partition; +import org.apache.samza.rest.model.Task; +import org.apache.samza.rest.proxy.task.TaskResourceConfig; +import org.apache.samza.rest.resources.mock.MockJobProxy; +import org.apache.samza.rest.resources.mock.MockResourceFactory; +import org.apache.samza.rest.resources.mock.MockTaskProxy; +import org.apache.samza.rest.resources.mock.MockTaskProxyFactory; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestTasksResource extends JerseyTest { + ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper(); + + @Override + protected Application configure() { + Map<String, String> configMap = ImmutableMap.of(TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY, + MockTaskProxyFactory.class.getName(), + SamzaRestConfig.CONFIG_REST_RESOURCE_FACTORIES, + MockResourceFactory.class.getName()); + SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap)); + return new SamzaRestApplication(config); + } + + @Test + public void testGetTasks() + throws IOException { + String requestUrl = String.format("v1/jobs/%s/%s/tasks", "testJobName", "testJobId"); + Response response = target(requestUrl).request().get(); + assertEquals(200, response.getStatus()); + Task[] tasks = objectMapper.readValue(response.readEntity(String.class), Task[].class); + assertEquals(2, tasks.length); + List<Partition> partitionList = ImmutableList.of(new Partition(MockTaskProxy.SYSTEM_NAME, + MockTaskProxy.STREAM_NAME, + MockTaskProxy.PARTITION_ID)); + + assertEquals(null, tasks[0].getPreferredHost()); + assertEquals(MockTaskProxy.TASK_1_CONTAINER_ID, tasks[0].getContainerId()); + assertEquals(MockTaskProxy.TASK_1_NAME, tasks[0].getTaskName()); + assertEquals(partitionList, tasks[0].getPartitions()); + + assertEquals(null, tasks[1].getPreferredHost()); + assertEquals(MockTaskProxy.TASK_2_CONTAINER_ID, tasks[1].getContainerId()); + assertEquals(MockTaskProxy.TASK_2_NAME, tasks[1].getTaskName()); + assertEquals(partitionList, tasks[1].getPartitions()); + } + + @Test + public void testGetTasksWithInvalidJobName() + throws IOException { + String requestUrl = String.format("v1/jobs/%s/%s/tasks", "BadJobName", MockJobProxy.JOB_INSTANCE_4_ID); + Response resp = target(requestUrl).request().get(); + assertEquals(400, resp.getStatus()); + final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {}); + assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("Invalid arguments for getTasks. ")); + resp.close(); + } + + @Test + public void testGetTasksWithInvalidJobId() + throws IOException { + String requestUrl = String.format("v1/jobs/%s/%s/tasks", MockJobProxy.JOB_INSTANCE_1_NAME, "BadJobId"); + Response resp = target(requestUrl).request().get(); + assertEquals(400, resp.getStatus()); + final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {}); + assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("Invalid arguments for getTasks. ")); + resp.close(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java new file mode 100644 index 0000000..aa6a0ba --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.rest.proxy.installation.InstallationFinder; +import org.apache.samza.rest.proxy.installation.InstallationRecord; +import org.apache.samza.rest.proxy.job.JobInstance; + + +public class MockInstallationFinder implements InstallationFinder { + + @Override + public boolean isInstalled(JobInstance jobInstance) { + return true; + } + + @Override + public Map<JobInstance, InstallationRecord> getAllInstalledJobs() { + return new HashMap<>(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java new file mode 100644 index 0000000..c4c0e94 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import java.util.ArrayList; +import java.util.List; +import org.apache.samza.config.Config; +import org.apache.samza.rest.proxy.task.TaskResourceConfig; +import org.apache.samza.rest.resources.JobsResource; +import org.apache.samza.rest.resources.JobsResourceConfig; +import org.apache.samza.rest.resources.ResourceFactory; +import org.apache.samza.rest.resources.TasksResource; + + +public class MockResourceFactory implements ResourceFactory { + + @Override + public List<? extends Object> getResourceInstances(Config config) { + List<Object> resources = new ArrayList<>(); + if (config.containsKey(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY)) { + resources.add(new JobsResource(new JobsResourceConfig(config))); + } + if (config.containsKey(TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY)) { + resources.add(new TasksResource(new TaskResourceConfig(config))); + } + return resources; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java new file mode 100644 index 0000000..45f252a --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.rest.proxy.job.JobInstance; +import org.apache.samza.rest.proxy.task.SamzaTaskProxy; +import org.apache.samza.rest.proxy.task.TaskResourceConfig; +import org.apache.samza.system.SystemStreamPartition; + + +public class MockTaskProxy extends SamzaTaskProxy { + public static final String SYSTEM_NAME = "testSystem"; + public static final String STREAM_NAME = "testStream"; + public static final Integer PARTITION_ID = 1; + public static final Set<SystemStreamPartition> SYSTEM_STREAM_PARTITIONS = ImmutableSet.of( + new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME, new Partition(PARTITION_ID))); + + public static final String TASK_1_NAME = "Task1"; + public static final int TASK_1_CONTAINER_ID = 1; + public static final Partition CHANGE_LOG_PARTITION = new Partition(0); + + public static final String TASK_2_NAME = "Task2"; + public static final int TASK_2_CONTAINER_ID = 2; + + public MockTaskProxy() { + super(new TaskResourceConfig(new MapConfig()), + new MockInstallationFinder()); + } + + @Override + protected JobModel getJobModel(JobInstance jobInstance) { + if (jobInstance.getJobId().contains("Bad") + || jobInstance.getJobName().contains("Bad")) { + throw new IllegalArgumentException("No tasks found."); + } + TaskModel task1Model = new TaskModel(new TaskName(TASK_1_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION); + TaskModel task2Model = new TaskModel(new TaskName(TASK_2_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION); + ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID, + ImmutableMap.of(new TaskName(TASK_1_NAME), + task1Model)); + ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID, + ImmutableMap.of(new TaskName(TASK_2_NAME), + task2Model)); + return new JobModel(new MapConfig(), ImmutableMap.of(TASK_1_CONTAINER_ID, task1ContainerModel, + TASK_2_CONTAINER_ID, task2ContainerModel)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java new file mode 100644 index 0000000..353ff29 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.rest.resources.mock; + +import org.apache.samza.rest.proxy.task.TaskProxy; +import org.apache.samza.rest.proxy.task.TaskProxyFactory; +import org.apache.samza.rest.proxy.task.TaskResourceConfig; + + +public class MockTaskProxyFactory implements TaskProxyFactory { + + @Override + public TaskProxy getTaskProxy(TaskResourceConfig config) { + return new MockTaskProxy(); + } +}
