phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1399930421
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,42 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ public static Queue<TaskState>
deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path
outputTaskStateDir,
Review Comment:
given `public static`, I'm presuming we anticipate to call externally from
outside this class. if so, deserves javadoc
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -316,6 +316,7 @@ private void
finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState
if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL
&& this.jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// The dataset state is set to FAILED if any task failed and
COMMIT_ON_FULL_SUCCESS is used
+ log.info("Failed task state for " +
taskState.getWorkunit().getOutputFilePath());
Review Comment:
three thoughts:
a. combine w/ logging on line 322?
b. do we have a failure reason to include in the log?
c. could we exhaustively list the failures for observability's sake? if
doing, I recommend to count the total and to include the counter's current
value in each log msg
if perf be concerning to fullly list for the general case, I'd suggest a
config to control whether or not.
also open to other suggestions for ways to obtain the exhaustive listing of
what failed for debugging purposes.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,42 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ public static Queue<TaskState>
deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path
outputTaskStateDir,
+ int numDeserializerThreads) throws IOException {
+ List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return input != null
+ &&
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
+ && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
+ }});
+
+ if (taskStateNames == null || taskStateNames.isEmpty()) {
+ log.info("No output task state files found in " + outputTaskStateDir);
+ return null;
+ }
+
+ final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
+ try (ParallelRunner stateSerDeRunner = new
ParallelRunner(numDeserializerThreads, null)) {
+ for (final String taskStateName : taskStateNames) {
+ log.info("Found output task state file " + taskStateName);
+ // Deserialize the TaskState and delete the file
+ stateSerDeRunner.submitCallable(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ TaskState taskState =
taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
+ taskStateQueue.add(taskState);
+ return null;
+ }
+ }, "Deserialize state for " + taskStateName);
+ }
+ } catch (IOException ioe) {
+ log.warn("Could not read all task state files.");
+ }
+ log.info(String.format("Collected task state of %d completed tasks",
taskStateQueue.size()));
Review Comment:
suggest including `outputTaskStateDir` among message
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,42 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ public static Queue<TaskState>
deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path
outputTaskStateDir,
+ int numDeserializerThreads) throws IOException {
+ List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return input != null
+ &&
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
+ && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
+ }});
+
+ if (taskStateNames == null || taskStateNames.isEmpty()) {
+ log.info("No output task state files found in " + outputTaskStateDir);
+ return null;
+ }
+
+ final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
+ try (ParallelRunner stateSerDeRunner = new
ParallelRunner(numDeserializerThreads, null)) {
+ for (final String taskStateName : taskStateNames) {
+ log.info("Found output task state file " + taskStateName);
Review Comment:
feels like debug...
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java:
##########
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.ddm.launcher;
+import io.temporal.client.WorkflowOptions;
Review Comment:
NBD, but I thought `java` imports were supposed to come first, no?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+
+
+@WorkflowInterface
+public interface CommitStepWorkflow {
+
+ /**
+ * This is the method that is executed when the Workflow Execution is
started. The Workflow
+ * Execution completes when this method finishes execution.
Review Comment:
recommend instead to document meaning of the return value
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+
+
+/** Activity for processing/executing a {@link
org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
Review Comment:
needs update
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job <jobStub>");
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ public Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
+ String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
+ if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+ JobState.DatasetState datasetState = new JobState.DatasetState();
+ datasetState.setDatasetUrn(datasetUrn);
+ datasetStatesByUrns.put(datasetUrn, datasetState);
+ }
+ return datasetUrn;
+ }
+
+ private static boolean shouldCommitDataInJob(State state) {
+ boolean jobCommitPolicyIsFull =
+ JobCommitPolicy.getCommitPolicy(state.getProperties()) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+ boolean publishDataAtJobLevel =
state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
+ ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
+ boolean jobDataPublisherSpecified =
+
!Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE));
+ return jobCommitPolicyIsFull || publishDataAtJobLevel ||
jobDataPublisherSpecified;
+ }
+
+ private static SharedResourcesBroker<GobblinScopeTypes>
createDefaultInstanceBroker(Properties jobProps) {
Review Comment:
suggest to:
* define in `JobStateUtils`
* give a name to contrast it to `getSharedResourcesBroker`
* document when one vs. the other is called for
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
Review Comment:
same as earlier comment about instance-level and TODO
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job <jobStub>");
Review Comment:
suggest to include more specific info (e.g. of the task state store path)
...basically something to correlate it w/ output prior from `logFailures`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -61,4 +70,11 @@ protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileS
// TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}
+
+ protected CommitStepWorkflow createCommitStepWorkflow() {
+ ChildWorkflowOptions childOpts =
+
ChildWorkflowOptions.newBuilder().setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON).build();
Review Comment:
suggest to `.setWorkflowId` with something like `Help.qualifyNamePerExec`,
as in `createProcessingWorkflow`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java:
##########
@@ -25,6 +25,7 @@
import java.time.Duration;
import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
Review Comment:
is this diff accurate? I see this import, but nowhere is the name used...
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,42 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ public static Queue<TaskState>
deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path
outputTaskStateDir,
+ int numDeserializerThreads) throws IOException {
+ List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return input != null
+ &&
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
+ && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
+ }});
+
+ if (taskStateNames == null || taskStateNames.isEmpty()) {
+ log.info("No output task state files found in " + outputTaskStateDir);
+ return null;
+ }
+
+ final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
+ try (ParallelRunner stateSerDeRunner = new
ParallelRunner(numDeserializerThreads, null)) {
+ for (final String taskStateName : taskStateNames) {
+ log.info("Found output task state file " + taskStateName);
+ // Deserialize the TaskState and delete the file
+ stateSerDeRunner.submitCallable(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ TaskState taskState =
taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
+ taskStateQueue.add(taskState);
+ return null;
+ }
+ }, "Deserialize state for " + taskStateName);
+ }
+ } catch (IOException ioe) {
+ log.warn("Could not read all task state files.");
Review Comment:
let's not just swallow, but at least log particulars of the error
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " +
this.outputTaskStateDir);
+ final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
+ if (taskStateQueue == null) {
return;
}
-
- final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
- try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, null)) {
- for (final String taskStateName : taskStateNames) {
- LOGGER.debug("Found output task state file " + taskStateName);
- // Deserialize the TaskState and delete the file
- stateSerDeRunner.submitCallable(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TaskState taskState =
taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
- taskStateQueue.add(taskState);
- taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
Review Comment:
is this `delete` no longer happening in the reimpl? if omitted, are we
concerned? method-level javadoc might describe whose responsibility to delete?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,42 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ public static Queue<TaskState>
deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path
outputTaskStateDir,
+ int numDeserializerThreads) throws IOException {
+ List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return input != null
+ &&
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
+ && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
+ }});
+
+ if (taskStateNames == null || taskStateNames.isEmpty()) {
+ log.info("No output task state files found in " + outputTaskStateDir);
Review Comment:
warn?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -0,0 +1,40 @@
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+
+import java.time.Duration;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+
+
+@Slf4j
+public class CommitStepWorkflowImpl implements CommitStepWorkflow {
+
+ private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofSeconds(999))
+ .setRetryOptions(ACTIVITY_RETRY_OPTS)
+ .build();
+
+ private final CommitActivity activityStub =
Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ Promise<Integer> result = Async.function(activityStub::commit, workSpec);
+ return result.get();
Review Comment:
fast-following w/ the `get` makes it looks like you want a sync invocation.
if so, let's call it directly (not via `Async.function`)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.Promise;
Review Comment:
extraneous? not immediately seeing where it's used
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+
+
+@WorkflowInterface
+public interface CommitStepWorkflow {
Review Comment:
javadoc
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
Review Comment:
at the least should be instance-level, w/ a TODO about adding a config prop
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+
+
+/** Activity for processing/executing a {@link
org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
+@ActivityInterface
+public interface CommitActivity {
+ @ActivityMethod
+ // CAUTION: void return type won't work, as apparently it mayn't be the
return type for `io.temporal.workflow.Functions.Func1`!
Review Comment:
please add javadoc to describe `@return` int. if a clear reason for the
int, there would be no need to relate this about the `void` type
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
Review Comment:
is this used?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job <jobStub>");
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ public Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
Review Comment:
couldn't this be `static`? that would be more testable
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
Review Comment:
could we use `JobStateUtils.getTaskStateStorePath` or is that different?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job <jobStub>");
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ public Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
+ String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
+ if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+ JobState.DatasetState datasetState = new JobState.DatasetState();
+ datasetState.setDatasetUrn(datasetUrn);
+ datasetStatesByUrns.put(datasetUrn, datasetState);
+ }
+ return datasetUrn;
+ }
+
+ private static boolean shouldCommitDataInJob(State state) {
Review Comment:
should `JobContext.shouldCommitDataInJob` have its accessibility upgraded to
`public` to call it from this class?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
Review Comment:
isn't there also the case where the publisher is not specified? this
message would do be more helpful if it were to indicate when that's been
forgotten
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ int numDeserializationThreads = 1;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
createDefaultInstanceBroker(jobState.getProperties());
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+
+ return 0;
+ }
+
+ protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig)
throws IOException {
+ return HadoopUtils.getFileSystem(fsUri, stateConfig);
+ }
+
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ final int numCommitThreads = 1;
+
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting dataset urns.");
Review Comment:
NBD, but I'd intuitively seek to log a count of the URNs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]