[ 
https://issues.apache.org/jira/browse/GOBBLIN-2046?focusedWorklogId=915392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-915392
 ]

ASF GitHub Bot logged work on GOBBLIN-2046:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Apr/24 20:56
            Start Date: 18/Apr/24 20:56
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571370125


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
     };
   }
 
-  protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String 
workUnitPath) {
-    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
-        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
-        .orElse(
-            "<<not a CopyableFile("
-                + getOptCopyEntityClass(workUnit, workUnitPath)
-                .map(Class::getSimpleName)
-                .orElse("<<not a CopyEntity!>>")
-                + "): '" + workUnitPath + "'"
-        );
+  protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit> 
workUnits, String workUnitsPath, JobState jobState) {
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit, 
workUnitsPath))
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(Collectors.toList());
+    if (fileSourcePaths.isEmpty()) {
+      // TODO - describe WorkUnits other than `CopyableFile`
+      return Optional.empty();
+    } else {
+      return Optional.of(getSourcePathsToLog(fileSourcePaths, 
jobState)).map(pathsToLog ->
+          "for copying source files: "
+              + (pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "))
+              + pathsToLog
+      );
+    }
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
-    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptWorkUnitCopyEntityClass(workUnit, 
workUnitPath).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+            copyableFile.getOrigin().getPath().toString()));
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
-    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
-      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
-      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
-        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
-        if (serialization != null) {
-          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
-        }
+  protected static Optional<CopyableFile> getOptCopyableFile(TaskState 
taskState) {
+    return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, taskState));
+  }
+
+  protected static Optional<CopyableFile> getOptCopyableFile(Class<?> 
copyEntityClass, State state) {
+    log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+    if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+      String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+      if (serialization != null) {
+        return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
       }
-      return Optional.empty();
-    });
+    }
+    return Optional.empty();
+  }
+
+  protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
   }
 
-  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
-    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
-      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
-    );
+  protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState 
taskState) {
+    return getOptCopyEntityClass(taskState, "taskState '" + 
taskState.getTaskId() + "'");
   }

Review Comment:
   is your suggestion to not consult each individual WU, but instead just 
sample one and acquire the info from that, presuming all WUs to be homogenous?
   
   semantically you're probably right about current practice of a single source 
producing homogenous WUs (not 100% certain, but I suspect it's true).  I also 
have no any reason to believe that would change any time soon.
   
   more as a general rule, my pref is to avoid spreading presumptions around 
various unrelated parts of code, but instead to code defensively.  e.g. I can 
*imagine* blending WUs of different types (even if we don't do that today).  if 
that were to ever come about, it would break such a presumption here.  hence 
the defensive approach of not using prior, outside knowledge to build 
presumptions.
   
   so that's my pref, but what do you think?
   
   





Issue Time Tracking
-------------------

    Worklog Id:     (was: 915392)
    Time Spent: 0.5h  (was: 20m)

> Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging not to presume 
> `CopyEntity`
> ----------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2046
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2046
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> there are many kinds of `WorkUnit`, beyond the `CopyEntity` produced by 
> `CopySource`.
> as it remains helpful, continue to log `CopyEntity`-specific info, but do so 
> w/o suggesting something is missing when the WU is not a `CopyEntity`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to