This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dfef88e  [GOBBLIN-988][GOBBLIN-897] Implement LocalFSJobStatusRetriever
dfef88e is described below

commit dfef88e5a820f4153dfcacba4e8b5c4d463d31bf
Author: William Lo <w...@linkedin.com>
AuthorDate: Fri Dec 20 10:55:25 2019 -0800

    [GOBBLIN-988][GOBBLIN-897] Implement LocalFSJobStatusRetriever
    
    Closes #2834 from Will-Lo/fs-jobstatus-monitor
---
 conf/gobblin-as-service/application.conf           |   9 +-
 .../mysql-cluster/gaas-application.conf            |  18 +---
 .../spec_executorInstance/LocalFsSpecProducer.java |  39 ++++---
 .../monitoring/LocalFsJobStatusRetriever.java      | 120 +++++++++++++++++++++
 4 files changed, 153 insertions(+), 33 deletions(-)

diff --git a/conf/gobblin-as-service/application.conf 
b/conf/gobblin-as-service/application.conf
index 8d5f694..5145325 100644
--- a/conf/gobblin-as-service/application.conf
+++ b/conf/gobblin-as-service/application.conf
@@ -29,7 +29,7 @@ topologySpecFactory.localGobblinCluster.version="1"
 topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
 
topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor"
 
topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest"
-topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs
+topologySpecFactory.localGobblinCluster.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs
 
 # Flow Catalog and Store
 flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore
@@ -40,12 +40,13 @@ 
gobblin.service.templateCatalogs.fullyQualifiedPath="file://"
 # JobStatusMonitor
 gobblin.service.jobStatusMonitor.enabled=false
 
-# FsJobStatusRetriever
-fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store
+# JobStatusRetriever
+jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
+localFsJobStatusRetriever.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs
 
 # DagManager
 gobblin.service.dagManager.enabled=true
-gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever"
+gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
 
gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore"
 
gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir
 
diff --git 
a/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf 
b/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf
index b2f9366..7c89cbe 100644
--- a/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf
+++ b/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf
@@ -28,7 +28,7 @@ topologySpecFactory.localGobblinCluster.version="1"
 topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
 
topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor"
 
topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest"
-topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs
+topologySpecFactory.localGobblinCluster.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs
 
 # Flow Catalog and Store
 flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore
@@ -39,12 +39,13 @@ 
gobblin.service.templateCatalogs.fullyQualifiedPath="file://"
 # JobStatusMonitor
 gobblin.service.jobStatusMonitor.enabled=false
 
-# FsJobStatusRetriever
-fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store
+# JobStatusRetriever
+jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
+localFsJobStatusRetriever.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs
 
 # DagManager
 gobblin.service.dagManager.enabled=true
-gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever"
+gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
 
gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore"
 
gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir
 
@@ -62,12 +63,3 @@ mysqlSpecStore.state.store.db.table="flow_spec_store"
 
mysqlSpecStore.state.store.db.url="jdbc:mysql://mysql.default.svc.cluster.local:3306/gaas_db"
 mysqlSpecStore.state.store.db.user=${mysqlCredentials.user}
 mysqlSpecStore.state.store.db.password=${mysqlCredentials.password}
-
-# MySQL Job Status Retriever
-jobStatusRetriever.class="org.apache.gobblin.service.monitoring.MysqlJobStatusRetriever"
-mysqlJobStatusRetriever.state.store.db.table="gaas_job_status"
-
-# Assuming default namespace. URL of the service takes the form of 
<service>.<namespace>.cluster.local
-mysqlJobStatusRetriever.state.store.db.url="jdbc:mysql://mysql.default.svc.cluster.local:3306/gaas_db"
-mysqlJobStatusRetriever.state.store.db.user=${mysqlCredentials.user}
-mysqlJobStatusRetriever.state.store.db.password=${mysqlCredentials.password}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
index 0f96cd9..74b534f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
@@ -16,9 +16,6 @@
  */
 
 package org.apache.gobblin.runtime.spec_executorInstance;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -28,12 +25,12 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -42,7 +39,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 public class LocalFsSpecProducer implements SpecProducer<Spec> {
   private String specProducerPath;
-  public static final String LOCAL_FS_PRODUCER_PATH_KEY = 
"gobblin.cluster.localSpecProducer.dir";
+  public static final String LOCAL_FS_PRODUCER_PATH_KEY = 
"localFsSpecProducer.dir";
 
   public LocalFsSpecProducer(Config config) {
     this.specProducerPath = config.getString(LOCAL_FS_PRODUCER_PATH_KEY);
@@ -72,9 +69,9 @@ public class LocalFsSpecProducer implements 
SpecProducer<Spec> {
 
   private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
     if (spec instanceof JobSpec) {
-      URI specUri = spec.getUri();
       // format the JobSpec to have file of <flowGroup>_<flowName>.job
-      String jobFileName = getJobFileName(specUri);
+      String flowExecutionId = ((JobSpec) 
spec).getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String jobFileName = getJobFileName(spec.getUri(), flowExecutionId);
       try (
         FileOutputStream fStream = new FileOutputStream(this.specProducerPath 
+ File.separatorChar + jobFileName);
       ) {
@@ -92,16 +89,26 @@ public class LocalFsSpecProducer implements 
SpecProducer<Spec> {
 
   /** Delete a {@link Spec} being executed on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
    * @param deletedSpecURI
-   * @param headers*/
+   * @param headers
+   */
   @Override
   public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
-    String jobFileName = getJobFileName(deletedSpecURI);
-    File file = new File(jobFileName);
-    if (file.delete()) {
-      log.info("Deleted spec: {}", jobFileName);
-      return new CompletedFuture<>(Boolean.TRUE, null);
+    String prefix = String.join("_", deletedSpecURI.getPath().split("/"));
+    // delete all of the jobs related to the spec
+    File dir = new File(this.specProducerPath);
+    File[] foundFiles = dir.listFiles((File file, String name) -> {
+        // only delete the jobs in progress
+        return name.startsWith(prefix) && name.endsWith(".job");
+    });
+
+    for (int i = 0; i < foundFiles.length; i++) {
+        Boolean didDelete = foundFiles[i].delete();
+        if (!didDelete) {
+          return new CompletedFuture<>(Boolean.TRUE, new 
RuntimeException(String.format("Failed to delete file with uri %s", 
deletedSpecURI)));
+        }
     }
-    throw new RuntimeException(String.format("Failed to delete file with uri 
%s", deletedSpecURI));
+
+    return new CompletedFuture<>(Boolean.TRUE, null);
   }
 
   /** List all {@link Spec} being executed on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}. */
@@ -110,9 +117,9 @@ public class LocalFsSpecProducer implements 
SpecProducer<Spec> {
     throw new UnsupportedOperationException();
   }
 
-  private String getJobFileName(URI specUri) {
+  public static String getJobFileName(URI specUri, String flowExecutionId) {
     String[] uriTokens = specUri.getPath().split("/");
-    return String.join("_", uriTokens) + ".job";
+    return String.join("_", uriTokens) + "_" + flowExecutionId + ".job";
   }
 
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
new file mode 100644
index 0000000..c9bb3e1
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Preconditions;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A job status monitor for jobs completed by a Gobblin Standalone instance 
running on the same machine. Mainly used for sandboxing/testing
+ * Considers a job done when Gobblin standalone appends ".done" to the job. 
Otherwise it will assume the job is in progress
+ */
+@Slf4j
+public class LocalFsJobStatusRetriever extends JobStatusRetriever {
+
+  public static final String CONF_PREFIX = "localFsJobStatusRetriever.";
+  private String JOB_DONE_SUFFIX = ".done";
+  private String specProducerPath;
+
+  // Do not use a state store for this implementation, just look at the job 
folder that @LocalFsSpecProducer writes to
+  public LocalFsJobStatusRetriever(Config config) {
+    this.specProducerPath = config.getString(CONF_PREFIX + 
LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
+  }
+
+  private Boolean doesJobExist(String flowName, String flowGroup, long 
flowExecutionId, String suffix) {
+    // Local FS has no monitor to update job state yet, for now check if 
standalone is completed with job, and mark as done
+    // Otherwise the job is pending
+    try {
+      String fileName = LocalFsSpecProducer.getJobFileName(new 
URI(File.separatorChar + flowGroup + File.separatorChar + flowName), 
String.valueOf(flowExecutionId)) + suffix;
+      return new File(this.specProducerPath + File.separatorChar + 
fileName).exists();
+    } catch (URISyntaxException e) {
+      log.error("URISyntaxException occurred when retrieving job status for 
flow: {},{}", flowGroup, flowName, e);
+    }
+    return false;
+  }
+
+  @Override
+  public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, 
String flowGroup, long flowExecutionId) {
+    Preconditions.checkArgument(flowName != null, "FlowName cannot be null");
+    Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");
+
+    // For the FS use case, JobExecutionID == FlowExecutionID
+    return getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, flowName, flowGroup);
+  }
+
+  @Override
+  public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, 
String flowGroup, long flowExecutionId,
+      String jobName, String jobGroup) {
+    Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(jobName != null, "jobName cannot be null");
+    Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null");
+    List<JobStatus> jobStatuses = new ArrayList<>();
+    JobStatus jobStatus;
+
+    if (this.doesJobExist(flowName, flowGroup, flowExecutionId, 
JOB_DONE_SUFFIX)) {
+      jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
+          
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build();
+    } else if (this.doesJobExist(flowName, flowGroup, flowExecutionId, "")) {
+      jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
+          
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.PENDING.name()).build();
+    } else {
+      return Iterators.emptyIterator();
+    }
+
+    jobStatuses.add(jobStatus);
+    return jobStatuses.iterator();
+  }
+
+  /**
+   * @param flowName
+   * @param flowGroup
+   * @return the last <code>count</code> flow execution ids with the given 
flowName and flowGroup. -1 will be returned if no such execution found.
+   */
+  @Override
+  public List<Long> getLatestExecutionIdsForFlow(String flowName, String 
flowGroup, int count) {
+    Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(count > 0, "Number of execution ids must be at 
least 1.");
+
+    //TODO: implement this
+
+    return null;
+  }
+
+  public StateStore<State> getStateStore() {
+    // this jobstatus retriever does not have a state store
+    // only used in tests so this is okay
+    return null;
+  }
+}

Reply via email to