[
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=893100&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893100
]
ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Nov/23 09:02
Start Date: 30/Nov/23 09:02
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1410345417
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " +
this.outputTaskStateDir);
+ final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
+ if (taskStateQueue == null) {
Review Comment:
I'd lean heavily toward returning an empty queue rather than `null`
if the issue is that the queue may be built asynchronously, such that
".isEmpty()" at the start might not always be the case... in that case, use
`Optional`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
Review Comment:
`protected`, or perhaps `private`?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return
+ */
+ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
Review Comment:
I'm not clear on which ones might be skipped, so please consider elaborating
in the comment
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
Review Comment:
I agree and understand... yet was also noting the non-specificity because
it's not only based on the configured commit policy, but also when
`ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE` may have been forgotten
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,52 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Reads in a {@link FsStateStore} folder used to store Task state outputs,
and returns a queue of {@link TaskState}s
+ * Task State files are populated by the {@link GobblinMultiTaskAttempt} to
record the output of remote concurrent tasks (e.g. MR mappers)
+ * @param taskStateStore
+ * @param outputTaskStateDir
+ * @param numDeserializerThreads
+ * @return Queue of TaskStates
Review Comment:
...or `null` apparently (which is very much worth documenting here!)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java:
##########
@@ -17,19 +17,20 @@
package org.apache.gobblin.temporal.ddm.workflow;
-import io.temporal.workflow.Promise;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
-import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+/**
+ * Workflow for committing the output of work done by {@link
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ */
@WorkflowInterface
public interface CommitStepWorkflow {
/**
- * This is the method that is executed when the Workflow Execution is
started. The Workflow
- * Execution completes when this method finishes execution.
+ * Commit the output of the work done by {@link
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ * Returns the number of workunits committed
Review Comment:
nit: `@return`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -23,30 +23,40 @@
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+@Slf4j
public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+ public static final String COMMIT_STEP_WORKFLOW_ID_BASE =
"CommitStepWorkflow";
@Override
public int process(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
- return processingWorkflow.performWorkload(
+ int workunitsProcessed = processingWorkflow.performWorkload(
WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
);
+ if (workunitsProcessed > 0) {
+ CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+ int result = commitWorkflow.commit(workSpec);
+ return result;
+ }
+ return workunitsProcessed;
Review Comment:
are we certain `commitWorkflow.commit()` would always return > 0? if not,
let's log when we started w/ `workunitsProcessed == 0` vs. when `result == 0`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return
Review Comment:
fill in
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job <jobStub>");
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ public Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
+ String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
+ if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+ JobState.DatasetState datasetState = new JobState.DatasetState();
+ datasetState.setDatasetUrn(datasetUrn);
+ datasetStatesByUrns.put(datasetUrn, datasetState);
+ }
+ return datasetUrn;
+ }
+
+ private static boolean shouldCommitDataInJob(State state) {
+ boolean jobCommitPolicyIsFull =
+ JobCommitPolicy.getCommitPolicy(state.getProperties()) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+ boolean publishDataAtJobLevel =
state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
+ ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
+ boolean jobDataPublisherSpecified =
+
!Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE));
+ return jobCommitPolicyIsFull || publishDataAtJobLevel ||
jobDataPublisherSpecified;
+ }
+
+ private static SharedResourcesBroker<GobblinScopeTypes>
createDefaultInstanceBroker(Properties jobProps) {
Review Comment:
great! this method isn't used then, is it? if not, let's remove
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,52 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Reads in a {@link FsStateStore} folder used to store Task state outputs,
and returns a queue of {@link TaskState}s
Review Comment:
it's not truly required to be FS-backed, is it?
Issue Time Tracking
-------------------
Worklog Id: (was: 893100)
Time Spent: 40m (was: 0.5h)
> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
> Key: GOBBLIN-1968
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to
> replace MapReduce for many of its data movement workflows. We want to
> integrate Gobblin's commit step that tracks task states and reports their
> status with Temporal
--
This message was sent by Atlassian Jira
(v8.20.10#820010)