Repository: helix
Updated Branches:
  refs/heads/master 71e4b6a66 -> a09a18ac5


[HELIX-780] add get/add job user content rest api


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a09a18ac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a09a18ac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a09a18ac

Branch: refs/heads/master
Commit: a09a18ac55464c3e399800b4474ccb6e64d168ec
Parents: 71e4b6a
Author: Harry Zhang <hrzh...@linkedin.com>
Authored: Mon Oct 8 15:36:53 2018 -0700
Committer: Harry Zhang <hrzh...@linkedin.com>
Committed: Thu Nov 1 12:10:05 2018 -0700

----------------------------------------------------------------------
 .../server/resources/helix/JobAccessor.java     | 68 ++++++++++++++++++--
 .../helix/rest/server/TestJobAccessor.java      | 38 +++++++++++
 2 files changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a09a18ac/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
----------------------------------------------------------------------
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
index 9a085f1..a984428 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.rest.server.resources.helix;
  * under the License.
  */
 
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,12 +27,13 @@ import java.util.Set;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
-
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.task.JobConfig;
@@ -41,11 +41,12 @@ import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.WorkflowConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 import org.codehaus.jackson.node.ObjectNode;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/clusters/{clusterId}/workflows/{workflowName}/jobs")
 public class JobAccessor extends AbstractHelixResource {
@@ -171,6 +172,65 @@ public class JobAccessor extends AbstractHelixResource {
     return badRequest("Job context for " + jobName + " does not exists");
   }
 
+  @GET
+  @Path("{jobName}/userContent")
+  public Response getJobUserContent(@PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") 
String jobName) {
+    TaskDriver taskDriver = getTaskDriver(clusterId);
+    try {
+      Map<String, String> contentStore =
+          taskDriver.getJobUserContentMap(workflowName, jobName);
+      if (contentStore == null) {
+        return JSONRepresentation(Collections.emptyMap());
+      }
+      return JSONRepresentation(contentStore);
+    } catch (ZkNoNodeException e) {
+      return notFound("Unable to find content store");
+    } catch (Exception e) {
+      return serverError(e);
+    }
+  }
+
+  @POST
+  @Path("{jobName}/userContent")
+  public Response updateWorkflowUserContent(
+      @PathParam("clusterId") String clusterId,
+      @PathParam("workflowName") String workflowName,
+      @PathParam("jobName") String jobName,
+      @QueryParam("command") String commandStr,
+      String content
+  ) {
+    Command cmd;
+    Map<String, String> contentMap = Collections.emptyMap();
+    try {
+      contentMap = OBJECT_MAPPER.readValue(content, new 
TypeReference<Map<String, String>>() {
+      });
+      cmd = (commandStr == null || commandStr.isEmpty())
+          ? Command.update
+          : Command.valueOf(commandStr);
+    } catch (IOException e) {
+      return badRequest(String
+          .format("Content %s cannot be deserialized to Map<String, String>. 
Err: %s", content,
+              e.getMessage()));
+    } catch (IllegalArgumentException ie) {
+      return badRequest(String.format("Invalid command: %s. Err: %s", 
commandStr, ie.getMessage()));
+    }
+
+    TaskDriver driver = getTaskDriver(clusterId);
+    try {
+      switch (cmd) {
+      case update:
+        driver.addOrUpdateJobUserContentMap(workflowName, jobName, contentMap);
+        return OK();
+      default:
+        return badRequest(String.format("Command \"%s\" is not supported!", 
cmd));
+      }
+    } catch (Exception e) {
+      _logger.error("Failed to update user content store", e);
+      return serverError(e);
+    }
+  }
+
   protected static JobConfig.Builder getJobConfig(Map<String, String> cfgMap) {
     return new JobConfig.Builder().fromMap(cfgMap);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/a09a18ac/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
----------------------------------------------------------------------
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
index 682039d..1cf377b 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
@@ -19,8 +19,11 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
@@ -36,6 +39,7 @@ import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.type.TypeReference;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -135,6 +139,40 @@ public class TestJobAccessor extends AbstractTestClass {
   }
 
   @Test(dependsOnMethods = "testCreateJob")
+  public void testGetAddJobContent() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String uri = "clusters/" + CLUSTER_NAME + 
"/workflows/Workflow_0/jobs/Job_0/userContent";
+
+    // Empty user content
+    String body =
+        get(uri, Response.Status.OK.getStatusCode(), true);
+    Map<String, String> contentStore = OBJECT_MAPPER.readValue(body, new 
TypeReference<Map<String, String>>() {});
+    Assert.assertTrue(contentStore.isEmpty());
+
+    // Post user content
+    Map<String, String> map1 = new HashMap<>();
+    map1.put("k1", "v1");
+    Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), 
MediaType.APPLICATION_JSON_TYPE);
+    post(uri, ImmutableMap.of("command", "delete"), entity, 
Response.Status.BAD_REQUEST.getStatusCode());
+    post(uri, ImmutableMap.of("command", "update"), entity, 
Response.Status.OK.getStatusCode());
+
+    // update (add items) workflow content store
+    body = get(uri, Response.Status.OK.getStatusCode(), true);
+    contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, 
String>>() {});
+    Assert.assertEquals(contentStore, map1);
+
+    // modify map1 and verify
+    map1.put("k1", "v2");
+    map1.put("k2", "v2");
+    entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), 
MediaType.APPLICATION_JSON_TYPE);
+    post(uri, ImmutableMap.of("command", "update"), entity, 
Response.Status.OK.getStatusCode());
+    body = get(uri, Response.Status.OK.getStatusCode(), true);
+    contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, 
String>>() {});
+    Assert.assertEquals(contentStore, map1);
+
+  }
+
+  @Test(dependsOnMethods = "testCreateJob")
   public void testDeleteJob() {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     TaskDriver driver = getTaskDriver(CLUSTER_NAME);

Reply via email to