This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 31d69f2 [GOBBLIN-796] Add support partial updates for flowConfig
31d69f2 is described below
commit 31d69f2730457cef660ddf576d8f695d8ca4571b
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Jul 15 15:02:30 2019 -0700
[GOBBLIN-796] Add support partial updates for flowConfig
Closes #2663 from jack-moseley/flow-trigger
---
...che.gobblin.service.flowconfigsV2.restspec.json | 5 +-
...che.gobblin.service.flowconfigsV2.snapshot.json | 5 +-
.../apache/gobblin/service/FlowConfigV2Client.java | 21 +++++++++
.../apache/gobblin/service/FlowConfigV2Test.java | 54 +++++++++++++++++++++-
.../service/FlowConfigResourceLocalHandler.java | 9 ++++
.../service/FlowConfigV2ResourceLocalHandler.java | 9 ++++
.../gobblin/service/FlowConfigsResource.java | 1 +
.../service/FlowConfigsResourceHandler.java | 6 +++
.../gobblin/service/FlowConfigsV2Resource.java | 16 +++++++
.../GobblinServiceFlowConfigResourceHandler.java | 16 +++++++
10 files changed, 139 insertions(+), 3 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index 8aa0726..0127c42 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -10,7 +10,7 @@
"type" : "org.apache.gobblin.service.FlowId",
"params" : "org.apache.gobblin.service.FlowStatusId"
},
- "supports" : [ "create", "delete", "get", "update" ],
+ "supports" : [ "create", "delete", "get", "update", "partial_update" ],
"methods" : [ {
"annotations" : {
"returnEntity" : { }
@@ -24,6 +24,9 @@
"method" : "update",
"doc" : "Update the flow configuration with the specified key. Running
flows are not affected.\n An error is raised if the flow configuration does not
exist."
}, {
+ "method" : "partial_update",
+ "doc" : "Partial update the flowConfig specified"
+ }, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The
schedule will be removed for scheduled flows."
} ],
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 99ea6fd..359e6bc 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -108,7 +108,7 @@
"type" : "org.apache.gobblin.service.FlowId",
"params" : "org.apache.gobblin.service.FlowStatusId"
},
- "supports" : [ "create", "delete", "get", "update" ],
+ "supports" : [ "create", "delete", "get", "update", "partial_update" ],
"methods" : [ {
"annotations" : {
"returnEntity" : { }
@@ -122,6 +122,9 @@
"method" : "update",
"doc" : "Update the flow configuration with the specified key. Running
flows are not affected.\n An error is raised if the flow configuration does not
exist."
}, {
+ "method" : "partial_update",
+ "doc" : "Partial update the flowConfig specified"
+ }, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The
schedule will be removed for scheduled flows."
} ],
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 94af118..d1e146a 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -38,6 +38,7 @@ import
com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.CreateIdEntityRequest;
import com.linkedin.restli.client.DeleteRequest;
import com.linkedin.restli.client.GetRequest;
+import com.linkedin.restli.client.PartialUpdateRequest;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
@@ -45,6 +46,7 @@ import com.linkedin.restli.client.UpdateRequest;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdEntityResponse;
+import com.linkedin.restli.common.PatchRequest;
/**
@@ -148,6 +150,25 @@ public class FlowConfigV2Client implements Closeable {
}
/**
+ * Partially update a flow configuration
+ * @param flowId flow ID to update
+ * @param flowConfigPatch {@link PatchRequest} containing changes to the
flowConfig
+ * @throws RemoteInvocationException
+ */
+ public void partialUpdateFlowConfig(FlowId flowId, PatchRequest<FlowConfig>
flowConfigPatch) throws RemoteInvocationException {
+ LOG.debug("partialUpdateFlowConfig with groupName " +
flowId.getFlowGroup() + " flowName " +
+ flowId.getFlowName());
+
+ PartialUpdateRequest<FlowConfig> partialUpdateRequest =
+ _flowconfigsV2RequestBuilders.partialUpdate().id(new
ComplexResourceKey<>(flowId, new FlowStatusId()))
+ .input(flowConfigPatch).build();
+
+ ResponseFuture<EmptyRecord> response =
_restClient.get().sendRequest(partialUpdateRequest);
+
+ response.getResponse();
+ }
+
+ /**
* Get a flow configuration
* @param flowId identifier of flow configuration to get
* @return a {@link FlowConfig} with the flow configuration
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 33e8e58..21f862a 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import org.apache.commons.io.IOUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -35,8 +35,13 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
+import com.linkedin.data.DataMap;
import com.linkedin.data.template.StringMap;
+import com.linkedin.restli.client.RestLiResponseException;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.internal.server.util.DataMapUtils;
import com.linkedin.restli.server.resources.BaseResource;
+import com.linkedin.restli.server.util.PatchApplier;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -44,6 +49,7 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
@Test(groups = { "gobblin.service" })
@@ -122,6 +128,52 @@ public class FlowConfigV2Test {
Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(),
-1L);
}
+ @Test
+ public void testPartialUpdate() throws Exception {
+ FlowId flowId = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+ flowProperties.put("param2", "value2");
+ flowProperties.put("param3", "value3");
+
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+
+ // Set some initial config
+ _client.updateFlowConfig(flowConfig);
+
+ // Change param2 to value4, delete param3
+ String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+ +
"\"properties\":{\"$set\":{\"param2\":\"value4\"},\"$delete\":[\"param3\"]}}";
+ DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+ PatchRequest<FlowConfig> flowConfigPatch =
PatchRequest.createFromPatchDocument(dataMap);
+
+ PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+
+ _client.updateFlowConfig(flowConfig);
+
+ FlowConfig retrievedFlowConfig = _client.getFlowConfig(flowId);
+
+ Assert.assertTrue(retrievedFlowConfig.getSchedule().isRunImmediately());
+ Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"),
"value1");
+ Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"),
"value4");
+
Assert.assertFalse(retrievedFlowConfig.getProperties().containsKey("param3"));
+ }
+
+ @Test (expectedExceptions = RestLiResponseException.class)
+ public void testBadPartialUpdate() throws Exception {
+ FlowId flowId = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+
+ String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+ +
"\"properties\":{\"$set\":{\"param2\":\"value4\"},\"$delete\":[\"param3\"]}}";
+ DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+ PatchRequest<FlowConfig> flowConfigPatch =
PatchRequest.createFromPatchDocument(dataMap);
+
+ // Throws exception since local handlers don't support partial update
+ _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
+ }
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index fcabd63..19544ad 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -26,15 +26,19 @@ import org.apache.commons.lang.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
+import com.linkedin.data.transform.DataProcessingException;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.naming.OperationNotSupportedException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -188,6 +192,11 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
return updateFlowConfig(flowId, flowConfig, true);
}
+ @Override
+ public UpdateResponse partialUpdateFlowConfig(FlowId flowId,
PatchRequest<FlowConfig> flowConfigPatch) throws FlowConfigLoggedException {
+ throw new UnsupportedOperationException("Partial update only supported by
GobblinServiceFlowConfigResourceHandler");
+ }
+
/**
* Delete flowConfig locally and trigger all listeners iff @param
triggerListener is set to true
*/
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b1e8eeb..effc415 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -20,9 +20,13 @@ import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
import com.linkedin.data.template.StringMap;
+import com.linkedin.data.transform.DataProcessingException;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateKVResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
import lombok.extern.slf4j.Slf4j;
@@ -74,4 +78,9 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
}
return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), flowConfig, httpStatus);
}
+
+ @Override
+ public UpdateResponse partialUpdateFlowConfig(FlowId flowId,
PatchRequest<FlowConfig> flowConfigPatch) throws FlowConfigLoggedException {
+ throw new UnsupportedOperationException("Partial update only supported by
GobblinServiceFlowConfigResourceHandler");
+ }
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index ab29c95..d1a5208 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -33,6 +33,7 @@ import javax.inject.Named;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.RestLiCollection;
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index 55850f6..ccab8ff 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
import java.util.Properties;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
@@ -40,6 +41,11 @@ public interface FlowConfigsResourceHandler {
UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws
FlowConfigLoggedException;
/**
+ * Partial update a {@link FlowConfig}
+ */
+ UpdateResponse partialUpdateFlowConfig(FlowId flowId,
PatchRequest<FlowConfig> flowConfig) throws FlowConfigLoggedException;
+
+ /**
* Delete {@link FlowConfig}
*/
UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws
FlowConfigLoggedException;
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 2adf4a6..14cbaa7 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSet;
import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
@@ -120,6 +122,20 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
}
/**
+ * Partial update the flowConfig specified
+ * @param key composite key containing group name and flow name that
identifies the flow to update
+ * @param flowConfigPatch patch describing what fields to change
+ * @return {@link UpdateResponse}
+ */
+ @Override
+ public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key,
PatchRequest<FlowConfig> flowConfigPatch) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ return this.getFlowConfigResourceHandler().partialUpdateFlowConfig(flowId,
flowConfigPatch);
+ }
+
+ /**
* Delete a configured flow. Running flows are not affected. The schedule
will be removed for scheduled flows.
* @param key composite key containing flow group and flow name that
identifies the flow to remove from the flow catalog
* @return {@link UpdateResponse}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 12abf3d..d056fe1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -25,11 +25,14 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import com.google.common.base.Optional;
+import com.linkedin.data.transform.DataProcessingException;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -168,6 +171,19 @@ public class GobblinServiceFlowConfigResourceHandler
implements FlowConfigsResou
}
}
+ @Override
+ public UpdateResponse partialUpdateFlowConfig(FlowId flowId,
PatchRequest<FlowConfig> flowConfigPatch) {
+ FlowConfig flowConfig = getFlowConfig(flowId);
+
+ try {
+ PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+ } catch (DataProcessingException e) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
"Failed to apply partial update", e);
+ }
+
+ return updateFlowConfig(flowId, flowConfig);
+ }
+
/**
* Deleting {@link FlowConfig} should check if current node is active
(master).
* If current node is active, call {@link
FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)} directly.