This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b57ecbcf4 [GOBBLIN-2046] Generalize gobblin-on-temporal 
`ProcessWorkUnitImpl` logging not to presume `CopyEntity` (#3925)
b57ecbcf4 is described below

commit b57ecbcf4cf80e8e7b58bee74224cf12da566cf6
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Apr 18 14:45:08 2024 -0700

    [GOBBLIN-2046] Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging 
not to presume `CopyEntity` (#3925)
    
    Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging not to presume 
`CopyEntity`
---
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     | 80 ++++++++++++----------
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  7 ++
 2 files changed, 51 insertions(+), 36 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index c093e83f4..aff935dd8 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -106,16 +106,12 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
     GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = 
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
 
     SharedResourcesBroker<GobblinScopeTypes> resourcesBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
-    List<String> fileSourcePaths = workUnits.stream()
-        .map(workUnit -> getCopyableFileSourcePathDesc(workUnit, 
wu.getWorkUnitPath()))
-        .collect(Collectors.toList());
-    List<String> pathsToLog = getSourcePathsToLog(fileSourcePaths, jobState);
-    log.info("WU [{}] - submitting {} workUnits for copying source files: 
{}{}",
+    Optional<String> optWorkUnitsDesc = getOptWorkUnitsDesc(workUnits, 
wu.getWorkUnitPath(), jobState);
+    log.info("WU [{}] - submitting {} workUnits {}",
         wu.getCorrelator(),
         workUnits.size(),
-        pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "),
-        pathsToLog);
-    log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), 
workUnits.get(0).toJsonString());
+        optWorkUnitsDesc.orElse(""));
+    log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), 
workUnits.isEmpty() ? "<<absent>>" : workUnits.get(0).toJsonString());
 
     GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
         jobState.getJobId(), containerId, jobState, workUnits,
@@ -154,46 +150,58 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
     };
   }
 
-  protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String 
workUnitPath) {
-    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
-        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
-        .orElse(
-            "<<not a CopyableFile("
-                + getOptCopyEntityClass(workUnit, workUnitPath)
-                .map(Class::getSimpleName)
-                .orElse("<<not a CopyEntity!>>")
-                + "): '" + workUnitPath + "'"
-        );
+  protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit> 
workUnits, String workUnitsPath, JobState jobState) {
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit, 
workUnitsPath))
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(Collectors.toList());
+    if (fileSourcePaths.isEmpty()) {
+      // TODO - describe WorkUnits other than `CopyableFile`
+      return Optional.empty();
+    } else {
+      return Optional.of(getSourcePathsToLog(fileSourcePaths, 
jobState)).map(pathsToLog ->
+          "for copying source files: "
+              + (pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "))
+              + pathsToLog
+      );
+    }
+  }
+
+  protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptWorkUnitCopyEntityClass(workUnit, 
workUnitPath).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+            copyableFile.getOrigin().getPath().toString()));
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
-    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  protected static Optional<CopyableFile> getOptCopyableFile(TaskState 
taskState) {
+    return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, taskState));
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
-    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
-      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
-      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
-        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
-        if (serialization != null) {
-          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
-        }
+  protected static Optional<CopyableFile> getOptCopyableFile(Class<?> 
copyEntityClass, State state) {
+    log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+    if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+      String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+      if (serialization != null) {
+        return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
       }
-      return Optional.empty();
-    });
+    }
+    return Optional.empty();
+  }
+
+  protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
   }
 
-  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
-    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
-      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
-    );
+  protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState 
taskState) {
+    return getOptCopyEntityClass(taskState, "taskState '" + 
taskState.getTaskId() + "'");
   }
 
-  protected Optional<Class<?>> getOptCopyEntityClass(State state, String 
logDesc) {
+  protected static Optional<Class<?>> getOptCopyEntityClass(State state, 
String logDesc) {
     try {
       return Optional.of(CopySource.getCopyEntityClass(state));
     } catch (IOException ioe) {
-      log.warn(logDesc + " - failed getting copy entity class:", ioe);
       return Optional.empty();
     }
   }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 7b9a23184..10ad44fea 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -647,33 +647,40 @@ public class GobblinYarnAppLauncher {
     Map<String, LocalResource> appMasterResources = Maps.newHashMap();
     FileSystem localFs = FileSystem.getLocal(new Configuration());
 
+    // NOTE: log after each step below for insight into what takes bulk of time
     if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
       Path libJarsDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
       addLibJars(new 
Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)),
           Optional.of(appMasterResources), libJarsDestDir, localFs);
+      LOGGER.info("Added lib jars to directory: {}", 
libJarsDestDir.toString());
     }
     if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY)) 
{
       Path appJarsDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
       
addAppJars(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY),
           Optional.of(appMasterResources), appJarsDestDir, localFs);
+      LOGGER.info("Added app jars to directory: {}", 
appJarsDestDir.toString());
     }
     if 
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY)) {
       Path appFilesDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
       
addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY),
           Optional.of(appMasterResources), appFilesDestDir, localFs);
+      LOGGER.info("Added app local files to directory: {}", 
appFilesDestDir.toString());
     }
     if 
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY)) 
{
       
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY),
           appMasterResources, yarnConfiguration);
+      LOGGER.info("Added remote files to local resources");
     }
     if 
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_ZIPS_REMOTE_KEY)) {
       
YarnHelixUtils.addRemoteZipsToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_ZIPS_REMOTE_KEY),
           appMasterResources, yarnConfiguration);
+      LOGGER.info("Added remote zips to local resources");
     }
     if 
(this.config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) {
       Path appFilesDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
       
addJobConfPackage(this.config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY),
 appFilesDestDir,
           appMasterResources);
+      LOGGER.info("Added job conf package to directory: {}", 
appFilesDestDir.toString());
     }
 
     return appMasterResources;

Reply via email to