This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b57ecbcf4 [GOBBLIN-2046] Generalize gobblin-on-temporal
`ProcessWorkUnitImpl` logging not to presume `CopyEntity` (#3925)
b57ecbcf4 is described below
commit b57ecbcf4cf80e8e7b58bee74224cf12da566cf6
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Apr 18 14:45:08 2024 -0700
[GOBBLIN-2046] Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging
not to presume `CopyEntity` (#3925)
Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging not to presume
`CopyEntity`
---
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 80 ++++++++++++----------
.../gobblin/yarn/GobblinYarnAppLauncher.java | 7 ++
2 files changed, 51 insertions(+), 36 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index c093e83f4..aff935dd8 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -106,16 +106,12 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
SharedResourcesBroker<GobblinScopeTypes> resourcesBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
- List<String> fileSourcePaths = workUnits.stream()
- .map(workUnit -> getCopyableFileSourcePathDesc(workUnit,
wu.getWorkUnitPath()))
- .collect(Collectors.toList());
- List<String> pathsToLog = getSourcePathsToLog(fileSourcePaths, jobState);
- log.info("WU [{}] - submitting {} workUnits for copying source files:
{}{}",
+ Optional<String> optWorkUnitsDesc = getOptWorkUnitsDesc(workUnits,
wu.getWorkUnitPath(), jobState);
+ log.info("WU [{}] - submitting {} workUnits {}",
wu.getCorrelator(),
workUnits.size(),
- pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "),
- pathsToLog);
- log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(),
workUnits.get(0).toJsonString());
+ optWorkUnitsDesc.orElse(""));
+ log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(),
workUnits.isEmpty() ? "<<absent>>" : workUnits.get(0).toJsonString());
GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
jobState.getJobId(), containerId, jobState, workUnits,
@@ -154,46 +150,58 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
};
}
- protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
- return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
- .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
- .orElse(
- "<<not a CopyableFile("
- + getOptCopyEntityClass(workUnit, workUnitPath)
- .map(Class::getSimpleName)
- .orElse("<<not a CopyEntity!>>")
- + "): '" + workUnitPath + "'"
- );
+ protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit>
workUnits, String workUnitsPath, JobState jobState) {
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit,
workUnitsPath))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (fileSourcePaths.isEmpty()) {
+ // TODO - describe WorkUnits other than `CopyableFile`
+ return Optional.empty();
+ } else {
+ return Optional.of(getSourcePathsToLog(fileSourcePaths,
jobState)).map(pathsToLog ->
+ "for copying source files: "
+ + (pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "))
+ + pathsToLog
+ );
+ }
+ }
+
+ protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit
workUnit, String workUnitPath) {
+ return getOptWorkUnitCopyEntityClass(workUnit,
workUnitPath).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+ copyableFile.getOrigin().getPath().toString()));
}
- protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
- return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ protected static Optional<CopyableFile> getOptCopyableFile(TaskState
taskState) {
+ return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, taskState));
}
- protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
- return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
- log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
- if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
- String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
- if (serialization != null) {
- return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
- }
+ protected static Optional<CopyableFile> getOptCopyableFile(Class<?>
copyEntityClass, State state) {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
}
- return Optional.empty();
- });
+ }
+ return Optional.empty();
+ }
+
+ protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit
workUnit, String workUnitPath) {
+ return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
+ protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState
taskState) {
+ return getOptCopyEntityClass(taskState, "taskState '" +
taskState.getTaskId() + "'");
}
- protected Optional<Class<?>> getOptCopyEntityClass(State state, String
logDesc) {
+ protected static Optional<Class<?>> getOptCopyEntityClass(State state,
String logDesc) {
try {
return Optional.of(CopySource.getCopyEntityClass(state));
} catch (IOException ioe) {
- log.warn(logDesc + " - failed getting copy entity class:", ioe);
return Optional.empty();
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 7b9a23184..10ad44fea 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -647,33 +647,40 @@ public class GobblinYarnAppLauncher {
Map<String, LocalResource> appMasterResources = Maps.newHashMap();
FileSystem localFs = FileSystem.getLocal(new Configuration());
+ // NOTE: log after each step below for insight into what takes bulk of time
if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
Path libJarsDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
addLibJars(new
Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)),
Optional.of(appMasterResources), libJarsDestDir, localFs);
+ LOGGER.info("Added lib jars to directory: {}",
libJarsDestDir.toString());
}
if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY))
{
Path appJarsDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
addAppJars(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY),
Optional.of(appMasterResources), appJarsDestDir, localFs);
+ LOGGER.info("Added app jars to directory: {}",
appJarsDestDir.toString());
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY)) {
Path appFilesDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY),
Optional.of(appMasterResources), appFilesDestDir, localFs);
+ LOGGER.info("Added app local files to directory: {}",
appFilesDestDir.toString());
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY))
{
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY),
appMasterResources, yarnConfiguration);
+ LOGGER.info("Added remote files to local resources");
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_ZIPS_REMOTE_KEY)) {
YarnHelixUtils.addRemoteZipsToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_ZIPS_REMOTE_KEY),
appMasterResources, yarnConfiguration);
+ LOGGER.info("Added remote zips to local resources");
}
if
(this.config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) {
Path appFilesDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
addJobConfPackage(this.config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY),
appFilesDestDir,
appMasterResources);
+ LOGGER.info("Added job conf package to directory: {}",
appFilesDestDir.toString());
}
return appMasterResources;