This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 58faa9a6a [GOBBLIN-1730] Include flow execution id when try to 
cancel/submit job using SimpleKafkaSpecProducer (#3588)
58faa9a6a is described below

commit 58faa9a6ae32605755baecf4d64c57fc424649e5
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 28 10:24:27 2022 -0700

    [GOBBLIN-1730] Include flow execution id when try to cancel/submit job 
using SimpleKafkaSpecProducer (#3588)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1730] Include flow execution id when try to cancel/submit job 
using SimpleKafkaSpecProducer
    
    * remove unnecessary dependency
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/service/SimpleKafkaSpecProducer.java   | 40 +++++++++++++++++++---
 .../modules/core/GobblinServiceGuiceModule.java    |  1 -
 .../service/modules/orchestration/DagManager.java  |  6 ++--
 .../modules/orchestration/Orchestrator.java        |  5 ++-
 4 files changed, 44 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 0548a20e0..a96ba48ae 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -17,15 +17,18 @@
 
 package org.apache.gobblin.service;
 
+import com.google.common.base.Joiner;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.Properties;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.slf4j.Logger;
 
 import com.codahale.metrics.Meter;
@@ -102,11 +105,37 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
     return 
this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 getClass().getSimpleName(), suffix));
   }
 
+  private Spec addExecutionIdToJobSpecUri(Spec spec) {
+    JobSpec newSpec = (JobSpec)spec;
+    if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+      try {
+        newSpec.setUri(new URI(Joiner.on("/").
+            join(spec.getUri().toString(), 
newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
+      } catch (URISyntaxException e) {
+        log.error("Cannot create job uri to cancel job", e);
+      }
+    }
+    return newSpec;
+  }
+
+  private URI getURIWithExecutionId(URI originalURI, Properties props) {
+    if (props.contains(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+      try {
+        originalURI = new URI(Joiner.on("/").
+            join(originalURI.toString(), 
props.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
+      } catch (URISyntaxException e) {
+        log.error("Cannot create job uri to cancel job", e);
+      }
+    }
+    return originalURI;
+  }
+
   @Override
   public Future<?> addSpec(Spec addedSpec) {
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, 
SpecExecutor.Verb.ADD);
+    Spec spec = addExecutionIdToJobSpecUri(addedSpec);
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, 
SpecExecutor.Verb.ADD);
 
-    log.info("Adding Spec: " + addedSpec + " using Kafka.");
+    log.info("Adding Spec: " + spec + " using Kafka.");
     this.addSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
@@ -114,9 +143,10 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> updateSpec(Spec updatedSpec) {
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, 
SpecExecutor.Verb.UPDATE);
+    Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, 
SpecExecutor.Verb.UPDATE);
 
-    log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+    log.info("Updating Spec: " + spec + " using Kafka.");
     this.updateSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
@@ -124,6 +154,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
+    deletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);
 
     AvroJobSpec avroJobSpec = 
AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
         .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, 
SpecExecutor.Verb.DELETE.name()))
@@ -137,6 +168,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
+    deletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
     AvroJobSpec avroJobSpec = 
AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
         .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, 
SpecExecutor.Verb.CANCEL.name()))
         .setProperties(Maps.fromProperties(properties)).build();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 16ef015a8..642f78f11 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -22,7 +22,6 @@ import java.util.Objects;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
-//import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0bcceaf62..8532ec9b8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -666,8 +666,10 @@ public class DagManager extends AbstractIdleService {
         props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
         sendCancellationEvent(dagNodeToCancel.getValue());
       }
-      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
-          
ConfigUtils.getString(dagNodeToCancel.getValue().getJobSpec().getConfig(), 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ""));
+      if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+        props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+            
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+      }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props);
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 19a1acdc5..9ba8abd38 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -398,9 +398,12 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     // Delete all compiled JobSpecs on their respective Executor
     for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
       JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-      Spec jobSpec = jobExecutionPlan.getJobSpec();
+      JobSpec jobSpec = jobExecutionPlan.getJobSpec();
       try {
         SpecProducer<Spec> producer = 
jobExecutionPlan.getSpecExecutor().getProducer().get();
+        if 
(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+          headers.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+        }
         _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", 
jobSpec, producer));
         producer.deleteSpec(jobSpec.getUri(), headers);
       } catch (Exception e) {

Reply via email to