This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 549462f4a [GOBBLIN-1804] Reject flow config updates that would fail 
compilation by returning service error (#3664)
549462f4a is described below

commit 549462f4a90c31f84e7a2aab6b8aee144413a7ce
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Mar 24 13:42:29 2023 -0700

    [GOBBLIN-1804] Reject flow config updates that would fail compilation by 
returning service error (#3664)
---
 .../apache/gobblin/service/FlowConfigV2Test.java   | 85 +++++++++++++++++++++-
 .../service/FlowConfigResourceLocalHandler.java    |  2 +-
 .../service/FlowConfigV2ResourceLocalHandler.java  | 47 ++++++++++++
 3 files changed, 130 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index bc16e3bad..b205ab244 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -20,10 +20,12 @@ package org.apache.gobblin.service;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.mortbay.jetty.HttpStatus;
+import org.mockito.ArgumentMatchers;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -31,6 +33,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
@@ -52,6 +55,7 @@ import lombok.Setter;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -70,6 +74,7 @@ public class FlowConfigV2Test {
   private TestRequesterService _requesterService;
   private GroupOwnershipService groupOwnershipService;
   private File groupConfigFile;
+  private Set<String> _compilationFailureFlowPaths = Sets.newHashSet();
 
   private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
   private static final String TEST_GROUP_NAME = "testGroup1";
@@ -82,6 +87,8 @@ public class FlowConfigV2Test {
   private static final String TEST_FLOW_NAME_7 = "testFlow7";
   private static final String TEST_FLOW_NAME_8 = "testFlow8";
   private static final String TEST_FLOW_NAME_9 = "testFlow9";
+  private static final String TEST_FLOW_NAME_10 = "testFlow10";
+  private static final String TEST_FLOW_NAME_11 = "testFlow11";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = 
"FS:///templates/test.template";
 
@@ -103,7 +110,11 @@ public class FlowConfigV2Test {
     final FlowCatalog flowCatalog = new FlowCatalog(config);
     final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
     
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
-    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    // NOTE: more general `ArgumentMatchers` (indicating compilation 
unsuccessful) must precede the specific
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(null));
+    when(mockListener.onAddSpec(ArgumentMatchers.argThat((FlowSpec flowSpec) 
-> {
+      return 
!_compilationFailureFlowPaths.contains(flowSpec.getUri().getPath());
+    }))).thenReturn(new AddSpecResponse(""));
     flowCatalog.addListener(mockListener);
     flowCatalog.startAsync();
     flowCatalog.awaitRunning();
@@ -168,6 +179,31 @@ public class FlowConfigV2Test {
     
Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(),
 -1L);
   }
 
+  @Test
+  public void testCreateRejectedWhenFailsCompilation() throws Exception {
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_10);
+    _requesterService.setRequester(TEST_REQUESTER);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put("param2", "value2");
+    flowProperties.put("param3", "value3");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_10))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+
+    // inform mock that this flow should fail compilation
+    _compilationFailureFlowPaths.add(String.format("/%s/%s", TEST_GROUP_NAME, 
TEST_FLOW_NAME_10));
+    try {
+      _client.createFlowConfig(flowConfig);
+      Assert.fail("create seemingly accepted (despite anticipated flow 
compilation failure)");
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_400_Bad_Request);
+      Assert.assertTrue(e.getMessage().contains("Flow was not compiled 
successfully."));
+    }
+  }
+
   @Test
   public void testPartialUpdate() throws Exception {
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3);
@@ -202,7 +238,7 @@ public class FlowConfigV2Test {
   }
 
   @Test (expectedExceptions = RestLiResponseException.class)
-  public void testBadPartialUpdate() throws Exception {
+  public void testPartialUpdateNotPossibleWithoutCreateFirst() throws 
Exception {
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
 
     String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
@@ -210,10 +246,53 @@ public class FlowConfigV2Test {
     DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
     PatchRequest<FlowConfig> flowConfigPatch = 
PatchRequest.createFromPatchDocument(dataMap);
 
-    // Throws exception since local handlers don't support partial update
+    // Throws exception since flow was not created first, prior to partial 
update
     _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
   }
 
+  @Test
+  public void testPartialUpdateRejectedWhenFailsCompilation() throws Exception 
{
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_11);
+    _requesterService.setRequester(TEST_REQUESTER);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put("param2", "value2");
+    flowProperties.put("param3", "value3");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_11))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+
+    // Set some initial config
+    _client.createFlowConfig(flowConfig);
+
+    // Change param2 to value4, delete param3, add param5=value5
+    String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+        + 
"\"properties\":{\"$set\":{\"param2\":\"value4\",\"param5\":\"value5\"},\"$delete\":[\"param3\"]}}";
+    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+    PatchRequest<FlowConfig> flowConfigPatch = 
PatchRequest.createFromPatchDocument(dataMap);
+
+    // inform mock that this flow should hereafter fail compilation
+    _compilationFailureFlowPaths.add(String.format("/%s/%s", TEST_GROUP_NAME, 
TEST_FLOW_NAME_11));
+    try {
+      _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
+      Assert.fail("update seemingly accepted (despite anticipated flow 
compilation failure)");
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_400_Bad_Request);
+      Assert.assertTrue(e.getMessage().contains("Flow was not compiled 
successfully."));
+    }
+
+    // verify that prior state of flow config still retained: that updates had 
no effect
+    FlowConfig retrievedFlowConfig = _client.getFlowConfig(flowId);
+
+    Assert.assertTrue(!retrievedFlowConfig.getSchedule().isRunImmediately());
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), 
"value1");
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), 
"value2");
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param3"), 
"value3");
+    
Assert.assertFalse(retrievedFlowConfig.getProperties().containsKey("param5"));
+  }
+
   @Test
   public void testDisallowedRequester() throws Exception {
     try {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index f97b1b915..272a8eeab 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -200,7 +200,7 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
     return new UpdateResponse(HttpStatus.S_200_OK);
   }
 
-  private boolean isUnscheduleRequest(FlowConfig flowConfig) {
+  protected final boolean isUnscheduleRequest(FlowConfig flowConfig) {
     return 
Boolean.parseBoolean(flowConfig.getProperties().getOrDefault(ConfigurationKeys.FLOW_UNSCHEDULE_KEY,
 "false"));
   }
 
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b20b83346..791721f60 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -154,6 +154,53 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     }
     return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
   }
+
+  /**
+   * Update flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
+   */
+  @Override
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, 
boolean triggerListener, long modifiedWatermark) {
+    log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", 
flowId.getFlowGroup(), flowId.getFlowName());
+
+    if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || 
!flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          "flowName and flowGroup cannot be changed in update", null);
+    }
+
+    FlowConfig originalFlowConfig = getFlowConfig(flowId);
+
+    if 
(!flowConfig.getProperties().containsKey(RequesterService.REQUESTER_LIST)) {
+      // Carry forward the requester list property if it is not being updated 
since it was added at time of creation
+      flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, 
originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST));
+    }
+
+    if (isUnscheduleRequest(flowConfig)) {
+      // flow config is not changed if it is just a request to un-schedule
+      originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
+      flowConfig = originalFlowConfig;
+    }
+
+    FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+    Map<String, AddSpecResponse> responseMap;
+    try {
+      responseMap = this.flowCatalog.update(flowSpec, triggerListener, 
modifiedWatermark);
+    } catch (QuotaExceededException e) {
+      throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, 
e.getMessage());
+    } catch (Throwable e) {
+      // TODO: Compilation errors should fall under throwable exceptions as 
well instead of checking for strings
+      log.warn(String.format("Failed to add flow configuration %s.%s to 
catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, 
e.getMessage());
+    }
+
+    if 
(Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL,
 new AddSpecResponse<>("false")).getValue().toString())) {
+      return new UpdateResponse(HttpStatus.S_200_OK);
+    } else {
+      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, 
getErrorMessage(flowSpec));
+    }
+  }
+
+
+
   /**
    * Note: this method is only implemented for testing, normally partial 
update would be called in
    * GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig

Reply via email to