This is an automated email from the ASF dual-hosted git repository. hutran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new dfef88e [GOBBLIN-988][GOBBLIN-897] Implement LocalFSJobStatusRetriever dfef88e is described below commit dfef88e5a820f4153dfcacba4e8b5c4d463d31bf Author: William Lo <w...@linkedin.com> AuthorDate: Fri Dec 20 10:55:25 2019 -0800 [GOBBLIN-988][GOBBLIN-897] Implement LocalFSJobStatusRetriever Closes #2834 from Will-Lo/fs-jobstatus-monitor --- conf/gobblin-as-service/application.conf | 9 +- .../mysql-cluster/gaas-application.conf | 18 +--- .../spec_executorInstance/LocalFsSpecProducer.java | 39 ++++--- .../monitoring/LocalFsJobStatusRetriever.java | 120 +++++++++++++++++++++ 4 files changed, 153 insertions(+), 33 deletions(-) diff --git a/conf/gobblin-as-service/application.conf b/conf/gobblin-as-service/application.conf index 8d5f694..5145325 100644 --- a/conf/gobblin-as-service/application.conf +++ b/conf/gobblin-as-service/application.conf @@ -29,7 +29,7 @@ topologySpecFactory.localGobblinCluster.version="1" topologySpecFactory.localGobblinCluster.uri="gobblinCluster" topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor" topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest" -topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs +topologySpecFactory.localGobblinCluster.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs # Flow Catalog and Store flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore @@ -40,12 +40,13 @@ gobblin.service.templateCatalogs.fullyQualifiedPath="file://" # JobStatusMonitor gobblin.service.jobStatusMonitor.enabled=false -# FsJobStatusRetriever -fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store +# JobStatusRetriever +jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever" +localFsJobStatusRetriever.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs # DagManager gobblin.service.dagManager.enabled=true -gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever" +gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever" gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore" gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir diff --git a/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf b/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf index b2f9366..7c89cbe 100644 --- a/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf +++ b/gobblin-kubernetes/gobblin-service/mysql-cluster/gaas-application.conf @@ -28,7 +28,7 @@ topologySpecFactory.localGobblinCluster.version="1" topologySpecFactory.localGobblinCluster.uri="gobblinCluster" topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor" topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest" -topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs +topologySpecFactory.localGobblinCluster.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs # Flow Catalog and Store flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore @@ -39,12 +39,13 @@ gobblin.service.templateCatalogs.fullyQualifiedPath="file://" # JobStatusMonitor gobblin.service.jobStatusMonitor.enabled=false -# FsJobStatusRetriever -fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store +# JobStatusRetriever +jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever" +localFsJobStatusRetriever.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs # DagManager gobblin.service.dagManager.enabled=true -gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever" +gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever" gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore" gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir @@ -62,12 +63,3 @@ mysqlSpecStore.state.store.db.table="flow_spec_store" mysqlSpecStore.state.store.db.url="jdbc:mysql://mysql.default.svc.cluster.local:3306/gaas_db" mysqlSpecStore.state.store.db.user=${mysqlCredentials.user} mysqlSpecStore.state.store.db.password=${mysqlCredentials.password} - -# MySQL Job Status Retriever -jobStatusRetriever.class="org.apache.gobblin.service.monitoring.MysqlJobStatusRetriever" -mysqlJobStatusRetriever.state.store.db.table="gaas_job_status" - -# Assuming default namespace. URL of the service takes the form of <service>.<namespace>.cluster.local -mysqlJobStatusRetriever.state.store.db.url="jdbc:mysql://mysql.default.svc.cluster.local:3306/gaas_db" -mysqlJobStatusRetriever.state.store.db.user=${mysqlCredentials.user} -mysqlJobStatusRetriever.state.store.db.password=${mysqlCredentials.password} diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java index 0f96cd9..74b534f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java @@ -16,9 +16,6 @@ */ package org.apache.gobblin.runtime.spec_executorInstance; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.typesafe.config.Config; import java.io.File; import java.io.FileOutputStream; @@ -28,12 +25,12 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; /** @@ -42,7 +39,7 @@ import org.apache.gobblin.util.ConfigUtils; @Slf4j public class LocalFsSpecProducer implements SpecProducer<Spec> { private String specProducerPath; - public static final String LOCAL_FS_PRODUCER_PATH_KEY = "gobblin.cluster.localSpecProducer.dir"; + public static final String LOCAL_FS_PRODUCER_PATH_KEY = "localFsSpecProducer.dir"; public LocalFsSpecProducer(Config config) { this.specProducerPath = config.getString(LOCAL_FS_PRODUCER_PATH_KEY); @@ -72,9 +69,9 @@ public class LocalFsSpecProducer implements SpecProducer<Spec> { private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) { if (spec instanceof JobSpec) { - URI specUri = spec.getUri(); // format the JobSpec to have file of <flowGroup>_<flowName>.job - String jobFileName = getJobFileName(specUri); + String flowExecutionId = ((JobSpec) spec).getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + String jobFileName = getJobFileName(spec.getUri(), flowExecutionId); try ( FileOutputStream fStream = new FileOutputStream(this.specProducerPath + File.separatorChar + jobFileName); ) { @@ -92,16 +89,26 @@ public class LocalFsSpecProducer implements SpecProducer<Spec> { /** Delete a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. * @param deletedSpecURI - * @param headers*/ + * @param headers + */ @Override public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) { - String jobFileName = getJobFileName(deletedSpecURI); - File file = new File(jobFileName); - if (file.delete()) { - log.info("Deleted spec: {}", jobFileName); - return new CompletedFuture<>(Boolean.TRUE, null); + String prefix = String.join("_", deletedSpecURI.getPath().split("/")); + // delete all of the jobs related to the spec + File dir = new File(this.specProducerPath); + File[] foundFiles = dir.listFiles((File file, String name) -> { + // only delete the jobs in progress + return name.startsWith(prefix) && name.endsWith(".job"); + }); + + for (int i = 0; i < foundFiles.length; i++) { + Boolean didDelete = foundFiles[i].delete(); + if (!didDelete) { + return new CompletedFuture<>(Boolean.TRUE, new RuntimeException(String.format("Failed to delete file with uri %s", deletedSpecURI))); + } } - throw new RuntimeException(String.format("Failed to delete file with uri %s", deletedSpecURI)); + + return new CompletedFuture<>(Boolean.TRUE, null); } /** List all {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */ @@ -110,9 +117,9 @@ public class LocalFsSpecProducer implements SpecProducer<Spec> { throw new UnsupportedOperationException(); } - private String getJobFileName(URI specUri) { + public static String getJobFileName(URI specUri, String flowExecutionId) { String[] uriTokens = specUri.getPath().split("/"); - return String.join("_", uriTokens) + ".job"; + return String.join("_", uriTokens) + "_" + flowExecutionId + ".job"; } } \ No newline at end of file diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java new file mode 100644 index 0000000..c9bb3e1 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import com.google.common.base.Preconditions; + +import com.google.common.collect.Iterators; +import com.typesafe.config.Config; +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer; +import org.apache.gobblin.service.ExecutionStatus; + +import lombok.extern.slf4j.Slf4j; + +/** + * A job status monitor for jobs completed by a Gobblin Standalone instance running on the same machine. Mainly used for sandboxing/testing + * Considers a job done when Gobblin standalone appends ".done" to the job. Otherwise it will assume the job is in progress + */ +@Slf4j +public class LocalFsJobStatusRetriever extends JobStatusRetriever { + + public static final String CONF_PREFIX = "localFsJobStatusRetriever."; + private String JOB_DONE_SUFFIX = ".done"; + private String specProducerPath; + + // Do not use a state store for this implementation, just look at the job folder that @LocalFsSpecProducer writes to + public LocalFsJobStatusRetriever(Config config) { + this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY); + } + + private Boolean doesJobExist(String flowName, String flowGroup, long flowExecutionId, String suffix) { + // Local FS has no monitor to update job state yet, for now check if standalone is completed with job, and mark as done + // Otherwise the job is pending + try { + String fileName = LocalFsSpecProducer.getJobFileName(new URI(File.separatorChar + flowGroup + File.separatorChar + flowName), String.valueOf(flowExecutionId)) + suffix; + return new File(this.specProducerPath + File.separatorChar + fileName).exists(); + } catch (URISyntaxException e) { + log.error("URISyntaxException occurred when retrieving job status for flow: {},{}", flowGroup, flowName, e); + } + return false; + } + + @Override + public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) { + Preconditions.checkArgument(flowName != null, "FlowName cannot be null"); + Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null"); + + // For the FS use case, JobExecutionID == FlowExecutionID + return getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, flowName, flowGroup); + } + + @Override + public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId, + String jobName, String jobGroup) { + Preconditions.checkArgument(flowName != null, "flowName cannot be null"); + Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null"); + Preconditions.checkArgument(jobName != null, "jobName cannot be null"); + Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null"); + List<JobStatus> jobStatuses = new ArrayList<>(); + JobStatus jobStatus; + + if (this.doesJobExist(flowName, flowGroup, flowExecutionId, JOB_DONE_SUFFIX)) { + jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId). + jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build(); + } else if (this.doesJobExist(flowName, flowGroup, flowExecutionId, "")) { + jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId). + jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.PENDING.name()).build(); + } else { + return Iterators.emptyIterator(); + } + + jobStatuses.add(jobStatus); + return jobStatuses.iterator(); + } + + /** + * @param flowName + * @param flowGroup + * @return the last <code>count</code> flow execution ids with the given flowName and flowGroup. -1 will be returned if no such execution found. + */ + @Override + public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) { + Preconditions.checkArgument(flowName != null, "flowName cannot be null"); + Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null"); + Preconditions.checkArgument(count > 0, "Number of execution ids must be at least 1."); + + //TODO: implement this + + return null; + } + + public StateStore<State> getStateStore() { + // this jobstatus retriever does not have a state store + // only used in tests so this is okay + return null; + } +}