Will-Lo commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571249282
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
};
}
- protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
- return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
- .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
- .orElse(
- "<<not a CopyableFile("
- + getOptCopyEntityClass(workUnit, workUnitPath)
- .map(Class::getSimpleName)
- .orElse("<<not a CopyEntity!>>")
- + "): '" + workUnitPath + "'"
- );
+ protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit>
workUnits, String workUnitsPath, JobState jobState) {
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit,
workUnitsPath))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (fileSourcePaths.isEmpty()) {
+ // TODO - describe WorkUnits other than `CopyableFile`
+ return Optional.empty();
+ } else {
+ return Optional.of(getSourcePathsToLog(fileSourcePaths,
jobState)).map(pathsToLog ->
+ "for copying source files: "
+ + (pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "))
+ + pathsToLog
+ );
+ }
}
- protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
- return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit
workUnit, String workUnitPath) {
+ return getOptWorkUnitCopyEntityClass(workUnit,
workUnitPath).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+ copyableFile.getOrigin().getPath().toString()));
}
- protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
- return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
- log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
- if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
- String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
- if (serialization != null) {
- return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
- }
+ protected static Optional<CopyableFile> getOptCopyableFile(TaskState
taskState) {
+ return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, taskState));
+ }
+
+ protected static Optional<CopyableFile> getOptCopyableFile(Class<?>
copyEntityClass, State state) {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
}
- return Optional.empty();
- });
+ }
+ return Optional.empty();
+ }
+
+ protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit
workUnit, String workUnitPath) {
+ return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
+ protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState
taskState) {
+ return getOptCopyEntityClass(taskState, "taskState '" +
taskState.getTaskId() + "'");
}
Review Comment:
Wondering if we need to get copyEntityClass at a workunit level explicitly
here since these functions imply that you can have multiple at a taskstate and
workunit level, afaik these are tied to the source class and there can only be
one source class per Gobblin job?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]