Will-Lo commented on code in PR #3816:
URL: https://github.com/apache/gobblin/pull/3816#discussion_r1378328057


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.ConfigFactory;
+import io.temporal.client.WorkflowOptions;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.util.PropertiesUtils;
+
+import static 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow 
that executes {@link WorkUnit}s to fulfill
+ * the work they specify.  see: {@link ProcessWorkUnitsWorkflow}
+ *
+ * <p>
+ *   This class is instantiated by the {@link 
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job 
submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link 
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class ProcessWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {
+  public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "name.node.uri";
+  public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR 
= GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.units.dir";
+
+  public static final String 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.branches.per.tree";
+  public static final String 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.sub.trees.per.tree";

Review Comment:
   Should we generalize these configs to a shared static TemporalConfig? In the 
Load Generator it also has similar configs for the same purpose of max 
branches/subtree



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.assistance;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import com.typesafe.config.Config;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+
+/** Various capabilities useful in implementing Distributed Data Movement 
(DDM) */
+@Slf4j
+public class Help {
+  public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
+  public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
+  public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
+  public static final String USER_TO_PROXY_KEY = "user.to.proxy";
+
+  // treat `JobState` as immutable and cache, for reuse among activities 
executed by the same worker
+  private static final transient Cache<Path, JobState> jobStateByPath = 
CacheBuilder.newBuilder().recordStats().build();
+  private static final transient AtomicInteger jobStateAccessCount = new 
AtomicInteger(0);
+
+  private Help() {}
+
+  public static String qualifyNamePerExec(String name, FileSystemJobStateful 
f, Config workerConfig) {
+    return name + "_" + calcPerExecQualifier(f, workerConfig);
+  }
+
+  public static String qualifyNamePerExec(String name, Config workerConfig) {
+    return name + "_" + calcPerExecQualifier(workerConfig);
+  }
+
+  public static String calcPerExecQualifier(FileSystemJobStateful f, Config 
workerConfig) {
+    Optional<String> optFlowExecId = Optional.empty();
+    try {
+      optFlowExecId = 
Optional.of(loadJobState(f).getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
null));
+    } catch (IOException e) {
+      log.warn("unable to loadJobState", e);
+    }
+    return optFlowExecId.map(x -> x + "_").orElse("") + 
calcPerExecQualifier(workerConfig);
+  }
+
+  public static String calcPerExecQualifier(Config workerConfig) {
+    String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
+        ? workerConfig.getString(USER_TO_PROXY_KEY) : "";
+    String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
+        ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : 
UUID.randomUUID().toString();
+    return userToProxy + "_" + azFlowExecId;
+  }
+
+  public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
+    // NOTE: `FileSystem.get` appears to implement caching, which should 
facilitate sharing among activities executing on the same worker
+    return loadFileSystemForUri(a.getFileSystemUri(), a.getFileSystemConfig());
+  }
+
+  public static FileSystem loadFileSystemForUri(URI fsUri, State fsConfig) 
throws IOException {
+    // TODO - determine whether this works... unclear whether it led to "FS 
closed", or that had another cause...
+    // return HadoopUtils.getFileSystem(fsUri, fsConfig);
+    Configuration conf = HadoopUtils.getConfFromState(fsConfig);
+    return FileSystem.get(fsUri, conf);
+  }
+
+  public static FileSystem loadFileSystemForce(FileSystemApt a) throws 
IOException {
+    return loadFileSystemForUriForce(a.getFileSystemUri(), 
a.getFileSystemConfig());
+  }
+
+  public static FileSystem loadFileSystemForUriForce(URI fsUri, State 
fsConfig) throws IOException {
+    // TODO - determine whether this works... unclear whether it led to "FS 
closed", or that had another cause...
+    // return HadoopUtils.getFileSystem(fsUri, fsConfig);
+    Configuration conf = HadoopUtils.getConfFromState(fsConfig);
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);

Review Comment:
   Do we really want to disable the cache here and not utilize the Hadoop 
caching of the filesystems in `Filesystem.get`? Comments a bit unclear on the 
motivation around this



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.AbstractTaskStateTracker;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.Task;
+import org.apache.gobblin.runtime.TaskCreationException;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+
+@Slf4j
+public class ProcessWorkUnitImpl implements ProcessWorkUnit {
+  private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;
+
+  @Override
+  public int processWorkUnit(WorkUnitClaimCheck wu) {
+    try (FileSystem fs = Help.loadFileSystemForce(wu)) {
+      List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
+      log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(), 
workUnits.size());
+      JobState jobState = Help.loadJobState(wu, fs);
+      return execute(workUnits, wu, jobState, fs);
+    } catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu, 
FileSystem fs) throws IOException {
+    Path wuPath = new Path(wu.getWorkUnitPath());
+    WorkUnit workUnit = 
JobLauncherUtils.createEmptyWorkUnitPerExtension(wuPath);
+    Help.deserializeStateWithRetries(fs, wuPath, workUnit, wu);
+    return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit));
+  }
+
+  /**
+   * NOTE: adapted from {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
+   * @return count of how many tasks executed (0 if execution ultimately 
failed, but we *believe* TaskState should already have been recorded beforehand)
+   */
+  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs) throws IOException, InterruptedException {
+    String containerId = "container-id-for-wu-" + wu.getCorrelator();
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
+
+    TaskStateTracker taskStateTracker = 
createEssentializedTaskStateTracker(wu);
+    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+    GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = 
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
+
+    SharedResourcesBroker<GobblinScopeTypes> resourcesBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+    AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter();
+    // 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties()));
+    troubleshooter.start();
+
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> describeAsCopyableFile(workUnit, 
wu.getWorkUnitPath()))
+        .collect(Collectors.toList());
+    log.info("WU [{}] - submitting {} workUnits for copying files: {}", 
wu.getCorrelator(),
+        workUnits.size(), fileSourcePaths);
+    log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), 
workUnits.get(0).toJsonString());
+
+    try {
+      GobblinMultiTaskAttempt taskAttempt = 
GobblinMultiTaskAttempt.runWorkUnits(
+          jobState.getJobId(), containerId, jobState, workUnits,
+          taskStateTracker, taskExecutor, taskStateStore, 
multiTaskAttemptCommitPolicy,
+          resourcesBroker, troubleshooter.getIssueRepository(), 
createInterruptionPredicate(fs, jobState));
+      return taskAttempt.getNumTasksCreated();
+    } catch (TaskCreationException tce) { // derived type of `IOException` 
that ought not be caught!
+      throw tce;
+    } catch (IOException ioe) {
+      // presume execution already occurred, with `TaskState` written to 
reflect outcome
+      log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite 
IOException:", ioe);
+      return 0;
+    }
+  }
+
+  /** Demonstration processing, to isolate debugging of WU loading and 
deserialization */
+  protected int countSumProperties(List<WorkUnit> workUnits, 
WorkUnitClaimCheck wu) {
+    int totalNumProps = workUnits.stream().mapToInt(workUnit -> 
workUnit.getPropertyNames().size()).sum();
+    log.info("opened WU [{}] to find {} properties total at '{}'", 
wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath());
+    return totalNumProps;
+  }
+
+  protected TaskStateTracker 
createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) {
+    return new AbstractTaskStateTracker(new Properties(), log) {
+      @Override
+      public void registerNewTask(Task task) {
+        // TODO: shall we schedule metrics update based on config?
+      }
+
+      @Override
+      public void onTaskRunCompletion(Task task) {
+        task.markTaskCompletion();
+      }
+
+      @Override
+      public void onTaskCommitCompletion(Task task) {
+        TaskState taskState = task.getTaskState();
+        // TODO: if metrics configured, report them now
+        log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", 
wu.getCorrelator(), task.getTaskId(),
+            taskState.getTaskDuration(), taskState.getWorkingState(),
+            
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
+                ? (" to: " + 
taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
+        log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), 
task.getTaskId(),
+            taskState.toJsonString(shouldUseExtendedLogging(wu)));
+        getOptCopyableFile(taskState).ifPresent(copyableFile -> {
+          log.info("WU [{} = {}] - completed copyableFile: {}", 
wu.getCorrelator(), task.getTaskId(),
+              copyableFile.toJsonString(shouldUseExtendedLogging(wu)));
+        });
+      }
+    };
+  }
+
+  protected String describeAsCopyableFile(WorkUnit workUnit, String 
workUnitPath) {
+    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
+        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
+        .orElse(
+            "<<not a CopyableFile("
+                + getOptCopyEntityClass(workUnit, workUnitPath)
+                .map(Class::getSimpleName)
+                .orElse("<<not a CopyEntity!>>")
+                + "): '" + workUnitPath + "'"
+        );
+  }
+
+  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
+    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  }
+
+  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
+    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
+      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
+    );
+  }
+
+  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
+    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
+      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+        if (serialization != null) {
+          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
+        }
+      }
+      return Optional.empty();
+    });
+  }
+
+  protected Optional<Class<?>> getOptCopyEntityClass(State state, String 
logDesc) {
+    try {
+      return Optional.of(CopySource.getCopyEntityClass(state));
+    } catch (IOException ioe) {
+      log.warn(logDesc + " - failed getting copy entity class:", ioe);
+      return Optional.empty();
+    }
+  }
+
+  protected Predicate<GobblinMultiTaskAttempt> 
createInterruptionPredicate(FileSystem fs, JobState jobState) {
+    // TODO - decide whether to support... and if so, employ a useful path; 
otherwise, just evaluate predicate to always false
+    Path interruptionPath = new 
Path("/not/a/real/path/that/should/ever/exist!");
+    return createInterruptionPredicate(fs, interruptionPath);
+  }
+
+  protected Predicate<GobblinMultiTaskAttempt> 
createInterruptionPredicate(FileSystem fs, Path interruptionPath) {
+    return  (gmta) -> {
+      try {
+        return fs.exists(interruptionPath);
+      } catch (IOException ioe) {
+        return false;
+      }
+    };
+  }
+
+  protected boolean shouldUseExtendedLogging(WorkUnitClaimCheck wu) {
+    try {

Review Comment:
   Is this function used to just have extended logging for a subset of the 
workers not to overwhelm the logs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to