[ 
https://issues.apache.org/jira/browse/GOBBLIN-2046?focusedWorklogId=915399&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-915399
 ]

ASF GitHub Bot logged work on GOBBLIN-2046:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Apr/24 21:07
            Start Date: 18/Apr/24 21:07
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571384641


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
     };
   }
 
-  protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String 
workUnitPath) {
-    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
-        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
-        .orElse(
-            "<<not a CopyableFile("
-                + getOptCopyEntityClass(workUnit, workUnitPath)
-                .map(Class::getSimpleName)
-                .orElse("<<not a CopyEntity!>>")
-                + "): '" + workUnitPath + "'"
-        );
+  protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit> 
workUnits, String workUnitsPath, JobState jobState) {
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit, 
workUnitsPath))
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(Collectors.toList());
+    if (fileSourcePaths.isEmpty()) {
+      // TODO - describe WorkUnits other than `CopyableFile`
+      return Optional.empty();
+    } else {
+      return Optional.of(getSourcePathsToLog(fileSourcePaths, 
jobState)).map(pathsToLog ->
+          "for copying source files: "
+              + (pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "))
+              + pathsToLog
+      );
+    }
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
-    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptWorkUnitCopyEntityClass(workUnit, 
workUnitPath).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+            copyableFile.getOrigin().getPath().toString()));
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
-    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
-      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
-      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
-        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
-        if (serialization != null) {
-          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
-        }
+  protected static Optional<CopyableFile> getOptCopyableFile(TaskState 
taskState) {
+    return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, taskState));
+  }
+
+  protected static Optional<CopyableFile> getOptCopyableFile(Class<?> 
copyEntityClass, State state) {
+    log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+    if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+      String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+      if (serialization != null) {
+        return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
       }
-      return Optional.empty();
-    });
+    }
+    return Optional.empty();
+  }
+
+  protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
   }
 
-  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
-    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
-      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
-    );
+  protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState 
taskState) {
+    return getOptCopyEntityClass(taskState, "taskState '" + 
taskState.getTaskId() + "'");
   }

Review Comment:
   I understand it from a defensive standpoint, yes that was the idea I had in 
mind.
   The reason why I suggested it is generally because (in my limited opinion) 
the framework should be opinionated around certain ideas, and right now a 
general concept in Gobblin is that workunits are generally homogenous at a WU 
level (hence why it may make sense to not conflate job state with task state/WU 
properties but currently that wasn't strongly enforced and led to this 
situation). If we have stronger presumptions that we are explicit about the 
code can be easier to reason about rather than learning about mismatches in 
behaviors after the implementation (which is what we're sort of seeing in the 
CommitActivity and Task states, though that is unrelated to this PR).





Issue Time Tracking
-------------------

    Worklog Id:     (was: 915399)
    Time Spent: 1h 10m  (was: 1h)

> Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging not to presume 
> `CopyEntity`
> ----------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2046
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2046
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> there are many kinds of `WorkUnit`, beyond the `CopyEntity` produced by 
> `CopySource`.
> as it remains helpful, continue to log `CopyEntity`-specific info, but do so 
> w/o suggesting something is missing when the WU is not a `CopyEntity`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to