Repository: helix Updated Branches: refs/heads/master bfaa83995 -> 71e4b6a66
[HELIX-780] add get/add user content for workflow rest api Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/71e4b6a6 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/71e4b6a6 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/71e4b6a6 Branch: refs/heads/master Commit: 71e4b6a66af1ae56a3667d5f6f5ca7ac63080997 Parents: bfaa839 Author: Harry Zhang <[email protected]> Authored: Tue Oct 2 18:01:30 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Thu Nov 1 11:57:03 2018 -0700 ---------------------------------------------------------------------- .../resources/helix/WorkflowAccessor.java | 66 +++++++++++++++++++- .../helix/rest/server/AbstractTestClass.java | 23 +++++++ .../helix/rest/server/TestWorkflowAccessor.java | 38 +++++++++++ 3 files changed, 124 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java index 9a9a62b..ac6a53c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java @@ -21,6 +21,7 @@ package org.apache.helix.rest.server.resources.helix; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -34,24 +35,26 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; - +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobDag; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.UserContentStore; import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.type.TypeFactory; import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; import org.codehaus.jackson.node.ObjectNode; import org.codehaus.jackson.node.TextNode; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/clusters/{clusterId}/workflows") public class WorkflowAccessor extends AbstractHelixResource { @@ -263,6 +266,63 @@ public class WorkflowAccessor extends AbstractHelixResource { } @GET + @Path("{workflowId}/userContent") + public Response getWorkflowUserContent( + @PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId + ) { + TaskDriver taskDriver = getTaskDriver(clusterId); + try { + Map<String, String> contentStore = + taskDriver.getWorkflowUserContentMap(workflowId); + if (contentStore == null) { + return JSONRepresentation(Collections.emptyMap()); + } + return JSONRepresentation(contentStore); + } catch (ZkNoNodeException e) { + return notFound("Unable to find content store"); + } catch (Exception e) { + return serverError(e); + } + } + + @POST + @Path("{workflowId}/userContent") + public Response updateWorkflowUserContent( + @PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId, + @QueryParam("command") String commandStr, + String content + ) { + Command cmd; + Map<String, String> contentMap = Collections.emptyMap(); + try { + contentMap = OBJECT_MAPPER.readValue(content, new TypeReference<Map<String, String>>() {}); + cmd = Command.valueOf(commandStr); + } catch (IOException e) { + return badRequest(String.format("Content %s cannot be deserialized to Map<String, String>. Err: %s", content, e.getMessage())); + } catch (IllegalArgumentException ie) { + return badRequest(String.format("Invalid command: %s. Err: %s", commandStr, ie.getMessage())); + } catch (NullPointerException npe) { + cmd = Command.update; + } + + TaskDriver driver = getTaskDriver(clusterId); + try { + switch (cmd) { + case update: + driver.addOrUpdateWorkflowUserContentMap(workflowId, contentMap); + return OK(); + default: + return badRequest(String.format("Command \"%s\" is not supported!", cmd)); + } + } catch (Exception e) { + _logger.error("Failed to update user content store", e); + return serverError(e); + } + } + + @GET @Path("{workflowId}/context") public Response getWorkflowContext(@PathParam("clusterId") String clusterId, @PathParam("workflowId") String workflowId) { http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index 2596f30..f59db1a 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkServer; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; @@ -39,12 +40,15 @@ import org.apache.helix.rest.server.auditlog.AuditLog; import org.apache.helix.rest.server.auditlog.AuditLogger; import org.apache.helix.rest.server.filters.AuditLogFilter; import org.apache.helix.rest.server.resources.AbstractResource; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; @@ -61,6 +65,7 @@ import org.testng.Assert; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; +import com.google.common.base.Joiner; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Application; @@ -337,6 +342,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected Map<String, Workflow> createWorkflows(String cluster, int numWorkflows) { Map<String, Workflow> workflows = new HashMap<>(); + HelixPropertyStore<ZNRecord> propertyStore = new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor, + PropertyPathBuilder.propertyStore(cluster), null); + for (int i = 0; i < numWorkflows; i++) { Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i); int j = 0; @@ -352,11 +360,20 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE), workflowContext.getRecord(), AccessOption.PERSISTENT); _configAccessor.setResourceConfig(cluster, WORKFLOW_PREFIX + i, workflow.getWorkflowConfig()); + + // Add workflow user content + propertyStore.create(Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, + TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE), + AccessOption.PERSISTENT); } return workflows; } protected Set<JobConfig.Builder> createJobs(String cluster, String workflowName, int numJobs) { + HelixPropertyStore<ZNRecord> propertyStore = + new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor, + PropertyPathBuilder.propertyStore(cluster), null); Set<JobConfig.Builder> jobCfgs = new HashSet<>(); for (int i = 0; i < numJobs; i++) { JobConfig.Builder job = @@ -370,6 +387,12 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i, TaskConstants.CONTEXT_NODE), jobContext.getRecord(), AccessOption.PERSISTENT); _configAccessor.setResourceConfig(cluster, workflowName + "_" + JOB_PREFIX + i, job.build()); + + // add job content stores + propertyStore.create(Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i, + TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE), + AccessOption.PERSISTENT); } return jobCfgs; } http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java index 3e3b8ae..d622066 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java @@ -2,6 +2,8 @@ package org.apache.helix.rest.server; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; @@ -15,6 +17,7 @@ import org.apache.helix.task.TaskExecutionInfo; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowConfig; import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.type.TypeReference; import org.testng.Assert; import org.testng.annotations.Test; @@ -137,6 +140,41 @@ public class TestWorkflowAccessor extends AbstractTestClass { TargetState.START); } + @Test(dependsOnMethods = "testCreateWorkflow") + public void testGetAndUpdateWorkflowContentStore() throws IOException, InterruptedException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String workflowName = "Workflow_0"; + TaskDriver driver = getTaskDriver(CLUSTER_NAME); + // Wait for workflow to start processing + driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS, TaskState.COMPLETED, TaskState.FAILED); + String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/userContent"; + + String body = + get(uri, Response.Status.OK.getStatusCode(), true); + Map<String, String> contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>() {}); + Assert.assertTrue(contentStore.isEmpty()); + + Map<String, String> map1 = new HashMap<>(); + map1.put("k1", "v1"); + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE); + post(uri, ImmutableMap.of("command", "delete"), entity, Response.Status.BAD_REQUEST.getStatusCode()); + post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode()); + + // update (add items) workflow content store + body = get(uri, Response.Status.OK.getStatusCode(), true); + contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>() {}); + Assert.assertEquals(contentStore, map1); + + // modify map1 and verify + map1.put("k1", "v2"); + map1.put("k2", "v2"); + entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE); + post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode()); + body = get(uri, Response.Status.OK.getStatusCode(), true); + contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>() {}); + Assert.assertEquals(contentStore, map1); + } + @Test(dependsOnMethods = "testUpdateWorkflow") public void testDeleteWorkflow() throws InterruptedException { System.out.println("Start test :" + TestHelper.getTestMethodName());
