phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1410345417
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " +
this.outputTaskStateDir);
+ final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
+ if (taskStateQueue == null) {
Review Comment:
I'd lean heavily toward returning an empty queue rather than `null`
if the issue is that the queue may be built asynchronously, such that
".isEmpty()" at the start might not always be the case... in that case, use
`Optional`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
Review Comment:
`protected`, or perhaps `private`?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return
+ */
+ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
Review Comment:
I'm not clear on which ones might be skipped, so please consider elaborating
in the comment
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
Review Comment:
I agree and understand... yet was also noting the non-specificity because
it's not only based on the configured commit policy, but also when
`ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE` may have been forgotten
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,52 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Reads in a {@link FsStateStore} folder used to store Task state outputs,
and returns a queue of {@link TaskState}s
+ * Task State files are populated by the {@link GobblinMultiTaskAttempt} to
record the output of remote concurrent tasks (e.g. MR mappers)
+ * @param taskStateStore
+ * @param outputTaskStateDir
+ * @param numDeserializerThreads
+ * @return Queue of TaskStates
Review Comment:
...or `null` apparently (which is very much worth documenting here!)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java:
##########
@@ -17,19 +17,20 @@
package org.apache.gobblin.temporal.ddm.workflow;
-import io.temporal.workflow.Promise;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
-import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+/**
+ * Workflow for committing the output of work done by {@link
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ */
@WorkflowInterface
public interface CommitStepWorkflow {
/**
- * This is the method that is executed when the Workflow Execution is
started. The Workflow
- * Execution completes when this method finishes execution.
+ * Commit the output of the work done by {@link
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ * Returns the number of workunits committed
Review Comment:
nit: `@return`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -23,30 +23,40 @@
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+@Slf4j
public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+ public static final String COMMIT_STEP_WORKFLOW_ID_BASE =
"CommitStepWorkflow";
@Override
public int process(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
- return processingWorkflow.performWorkload(
+ int workunitsProcessed = processingWorkflow.performWorkload(
WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
);
+ if (workunitsProcessed > 0) {
+ CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+ int result = commitWorkflow.commit(workSpec);
+ return result;
+ }
+ return workunitsProcessed;
Review Comment:
are we certain `commitWorkflow.commit()` would always return > 0? if not,
let's log when we started w/ `workunitsProcessed == 0` vs. when `result == 0`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return
Review Comment:
fill in
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job <jobStub>");
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ public Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
+ String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
+ if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+ JobState.DatasetState datasetState = new JobState.DatasetState();
+ datasetState.setDatasetUrn(datasetUrn);
+ datasetStatesByUrns.put(datasetUrn, datasetState);
+ }
+ return datasetUrn;
+ }
+
+ private static boolean shouldCommitDataInJob(State state) {
+ boolean jobCommitPolicyIsFull =
+ JobCommitPolicy.getCommitPolicy(state.getProperties()) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+ boolean publishDataAtJobLevel =
state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
+ ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
+ boolean jobDataPublisherSpecified =
+
!Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE));
+ return jobCommitPolicyIsFull || publishDataAtJobLevel ||
jobDataPublisherSpecified;
+ }
+
+ private static SharedResourcesBroker<GobblinScopeTypes>
createDefaultInstanceBroker(Properties jobProps) {
Review Comment:
great! this method isn't used then, is it? if not, let's remove
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,52 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Reads in a {@link FsStateStore} folder used to store Task state outputs,
and returns a queue of {@link TaskState}s
Review Comment:
it's not truly required to be FS-backed, is it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]