This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 31d69f2  [GOBBLIN-796] Add support partial updates for flowConfig
31d69f2 is described below

commit 31d69f2730457cef660ddf576d8f695d8ca4571b
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Jul 15 15:02:30 2019 -0700

    [GOBBLIN-796] Add support partial updates for flowConfig
    
    Closes #2663 from jack-moseley/flow-trigger
---
 ...che.gobblin.service.flowconfigsV2.restspec.json |  5 +-
 ...che.gobblin.service.flowconfigsV2.snapshot.json |  5 +-
 .../apache/gobblin/service/FlowConfigV2Client.java | 21 +++++++++
 .../apache/gobblin/service/FlowConfigV2Test.java   | 54 +++++++++++++++++++++-
 .../service/FlowConfigResourceLocalHandler.java    |  9 ++++
 .../service/FlowConfigV2ResourceLocalHandler.java  |  9 ++++
 .../gobblin/service/FlowConfigsResource.java       |  1 +
 .../service/FlowConfigsResourceHandler.java        |  6 +++
 .../gobblin/service/FlowConfigsV2Resource.java     | 16 +++++++
 .../GobblinServiceFlowConfigResourceHandler.java   | 16 +++++++
 10 files changed, 139 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index 8aa0726..0127c42 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -10,7 +10,7 @@
       "type" : "org.apache.gobblin.service.FlowId",
       "params" : "org.apache.gobblin.service.FlowStatusId"
     },
-    "supports" : [ "create", "delete", "get", "update" ],
+    "supports" : [ "create", "delete", "get", "update", "partial_update" ],
     "methods" : [ {
       "annotations" : {
         "returnEntity" : { }
@@ -24,6 +24,9 @@
       "method" : "update",
       "doc" : "Update the flow configuration with the specified key. Running 
flows are not affected.\n An error is raised if the flow configuration does not 
exist."
     }, {
+      "method" : "partial_update",
+      "doc" : "Partial update the flowConfig specified"
+    }, {
       "method" : "delete",
       "doc" : "Delete a configured flow. Running flows are not affected. The 
schedule will be removed for scheduled flows."
     } ],
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 99ea6fd..359e6bc 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -108,7 +108,7 @@
         "type" : "org.apache.gobblin.service.FlowId",
         "params" : "org.apache.gobblin.service.FlowStatusId"
       },
-      "supports" : [ "create", "delete", "get", "update" ],
+      "supports" : [ "create", "delete", "get", "update", "partial_update" ],
       "methods" : [ {
         "annotations" : {
           "returnEntity" : { }
@@ -122,6 +122,9 @@
         "method" : "update",
         "doc" : "Update the flow configuration with the specified key. Running 
flows are not affected.\n An error is raised if the flow configuration does not 
exist."
       }, {
+        "method" : "partial_update",
+        "doc" : "Partial update the flowConfig specified"
+      }, {
         "method" : "delete",
         "doc" : "Delete a configured flow. Running flows are not affected. The 
schedule will be removed for scheduled flows."
       } ],
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 94af118..d1e146a 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -38,6 +38,7 @@ import 
com.linkedin.r2.transport.http.client.HttpClientFactory;
 import com.linkedin.restli.client.CreateIdEntityRequest;
 import com.linkedin.restli.client.DeleteRequest;
 import com.linkedin.restli.client.GetRequest;
+import com.linkedin.restli.client.PartialUpdateRequest;
 import com.linkedin.restli.client.Response;
 import com.linkedin.restli.client.ResponseFuture;
 import com.linkedin.restli.client.RestClient;
@@ -45,6 +46,7 @@ import com.linkedin.restli.client.UpdateRequest;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.IdEntityResponse;
+import com.linkedin.restli.common.PatchRequest;
 
 
 /**
@@ -148,6 +150,25 @@ public class FlowConfigV2Client implements Closeable {
   }
 
   /**
+   * Partially update a flow configuration
+   * @param flowId flow ID to update
+   * @param flowConfigPatch {@link PatchRequest} containing changes to the 
flowConfig
+   * @throws RemoteInvocationException
+   */
+  public void partialUpdateFlowConfig(FlowId flowId, PatchRequest<FlowConfig> 
flowConfigPatch) throws RemoteInvocationException {
+    LOG.debug("partialUpdateFlowConfig with groupName " + 
flowId.getFlowGroup() + " flowName " +
+        flowId.getFlowName());
+
+    PartialUpdateRequest<FlowConfig> partialUpdateRequest =
+        _flowconfigsV2RequestBuilders.partialUpdate().id(new 
ComplexResourceKey<>(flowId, new FlowStatusId()))
+            .input(flowConfigPatch).build();
+
+    ResponseFuture<EmptyRecord> response = 
_restClient.get().sendRequest(partialUpdateRequest);
+
+    response.getResponse();
+  }
+
+  /**
    * Get a flow configuration
    * @param flowId identifier of flow configuration to get
    * @return a {@link FlowConfig} with the flow configuration
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 33e8e58..21f862a 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import org.apache.commons.io.IOUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -35,8 +35,13 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
+import com.linkedin.data.DataMap;
 import com.linkedin.data.template.StringMap;
+import com.linkedin.restli.client.RestLiResponseException;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.internal.server.util.DataMapUtils;
 import com.linkedin.restli.server.resources.BaseResource;
+import com.linkedin.restli.server.util.PatchApplier;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -44,6 +49,7 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 
 
 @Test(groups = { "gobblin.service" })
@@ -122,6 +128,52 @@ public class FlowConfigV2Test {
     
Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(),
 -1L);
   }
 
+  @Test
+  public void testPartialUpdate() throws Exception {
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put("param2", "value2");
+    flowProperties.put("param3", "value3");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+
+    // Set some initial config
+    _client.updateFlowConfig(flowConfig);
+
+    // Change param2 to value4, delete param3
+    String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+        + 
"\"properties\":{\"$set\":{\"param2\":\"value4\"},\"$delete\":[\"param3\"]}}";
+    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+    PatchRequest<FlowConfig> flowConfigPatch = 
PatchRequest.createFromPatchDocument(dataMap);
+
+    PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+
+    _client.updateFlowConfig(flowConfig);
+
+    FlowConfig retrievedFlowConfig = _client.getFlowConfig(flowId);
+
+    Assert.assertTrue(retrievedFlowConfig.getSchedule().isRunImmediately());
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), 
"value1");
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), 
"value4");
+    
Assert.assertFalse(retrievedFlowConfig.getProperties().containsKey("param3"));
+  }
+
+  @Test (expectedExceptions = RestLiResponseException.class)
+  public void testBadPartialUpdate() throws Exception {
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+
+    String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+        + 
"\"properties\":{\"$set\":{\"param2\":\"value4\"},\"$delete\":[\"param3\"]}}";
+    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+    PatchRequest<FlowConfig> flowConfigPatch = 
PatchRequest.createFromPatchDocument(dataMap);
+
+    // Throws exception since local handlers don't support partial update
+    _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
+  }
 
   @AfterClass(alwaysRun = true)
   public void tearDown() throws Exception {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index fcabd63..19544ad 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -26,15 +26,19 @@ import org.apache.commons.lang.StringUtils;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
+import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.naming.OperationNotSupportedException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -188,6 +192,11 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
     return updateFlowConfig(flowId, flowConfig, true);
   }
 
+  @Override
+  public UpdateResponse partialUpdateFlowConfig(FlowId flowId, 
PatchRequest<FlowConfig> flowConfigPatch) throws FlowConfigLoggedException {
+    throw new UnsupportedOperationException("Partial update only supported by 
GobblinServiceFlowConfigResourceHandler");
+  }
+
   /**
    * Delete flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
    */
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b1e8eeb..effc415 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -20,9 +20,13 @@ import java.util.Map;
 import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.linkedin.data.template.StringMap;
+import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -74,4 +78,9 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     }
     return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), flowConfig, httpStatus);
   }
+
+  @Override
+  public UpdateResponse partialUpdateFlowConfig(FlowId flowId, 
PatchRequest<FlowConfig> flowConfigPatch) throws FlowConfigLoggedException {
+    throw new UnsupportedOperationException("Partial update only supported by 
GobblinServiceFlowConfigResourceHandler");
+  }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index ab29c95..d1a5208 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -33,6 +33,7 @@ import javax.inject.Named;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.RestLiCollection;
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index 55850f6..ccab8ff 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
 
 import java.util.Properties;
 
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
 
@@ -40,6 +41,11 @@ public interface FlowConfigsResourceHandler {
   UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws 
FlowConfigLoggedException;
 
   /**
+   * Partial update a {@link FlowConfig}
+   */
+  UpdateResponse partialUpdateFlowConfig(FlowId flowId, 
PatchRequest<FlowConfig> flowConfig) throws FlowConfigLoggedException;
+
+  /**
    * Delete {@link FlowConfig}
    */
   UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws 
FlowConfigLoggedException;
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 2adf4a6..14cbaa7 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSet;
 import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
@@ -120,6 +122,20 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
   }
 
   /**
+   * Partial update the flowConfig specified
+   * @param key composite key containing group name and flow name that 
identifies the flow to update
+   * @param flowConfigPatch patch describing what fields to change
+   * @return {@link UpdateResponse}
+   */
+  @Override
+  public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, 
PatchRequest<FlowConfig> flowConfigPatch) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().partialUpdateFlowConfig(flowId, 
flowConfigPatch);
+  }
+
+  /**
    * Delete a configured flow. Running flows are not affected. The schedule 
will be removed for scheduled flows.
    * @param key composite key containing flow group and flow name that 
identifies the flow to remove from the flow catalog
    * @return {@link UpdateResponse}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 12abf3d..d056fe1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -25,11 +25,14 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 
 import com.google.common.base.Optional;
+import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -168,6 +171,19 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
     }
   }
 
+  @Override
+  public UpdateResponse partialUpdateFlowConfig(FlowId flowId, 
PatchRequest<FlowConfig> flowConfigPatch) {
+    FlowConfig flowConfig = getFlowConfig(flowId);
+
+    try {
+      PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+    } catch (DataProcessingException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, 
"Failed to apply partial update", e);
+    }
+
+    return updateFlowConfig(flowId, flowConfig);
+  }
+
   /**
    * Deleting {@link FlowConfig} should check if current node is active 
(master).
    * If current node is active, call {@link 
FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)} directly.

Reply via email to