phet commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571406415


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
     };
   }
 
-  protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String 
workUnitPath) {
-    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
-        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
-        .orElse(
-            "<<not a CopyableFile("
-                + getOptCopyEntityClass(workUnit, workUnitPath)
-                .map(Class::getSimpleName)
-                .orElse("<<not a CopyEntity!>>")
-                + "): '" + workUnitPath + "'"
-        );
+  protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit> 
workUnits, String workUnitsPath, JobState jobState) {
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit, 
workUnitsPath))
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(Collectors.toList());
+    if (fileSourcePaths.isEmpty()) {
+      // TODO - describe WorkUnits other than `CopyableFile`
+      return Optional.empty();
+    } else {
+      return Optional.of(getSourcePathsToLog(fileSourcePaths, 
jobState)).map(pathsToLog ->
+          "for copying source files: "
+              + (pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "))
+              + pathsToLog
+      );
+    }
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
-    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptWorkUnitCopyEntityClass(workUnit, 
workUnitPath).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+            copyableFile.getOrigin().getPath().toString()));
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
-    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
-      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
-      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
-        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
-        if (serialization != null) {
-          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
-        }
+  protected static Optional<CopyableFile> getOptCopyableFile(TaskState 
taskState) {
+    return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, taskState));
+  }
+
+  protected static Optional<CopyableFile> getOptCopyableFile(Class<?> 
copyEntityClass, State state) {
+    log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+    if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+      String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+      if (serialization != null) {
+        return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
       }
-      return Optional.empty();
-    });
+    }
+    return Optional.empty();
+  }
+
+  protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
   }
 
-  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
-    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
-      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
-    );
+  protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState 
taskState) {
+    return getOptCopyEntityClass(taskState, "taskState '" + 
taskState.getTaskId() + "'");
   }

Review Comment:
   I definitely agree--great system design philosophy!  basically define core 
requirements that translate into predictable system invariants, against which 
we reduce the conceptual surface area for maintainers (forever and after!).  
clearly a big potential win!  the trick is to identify what restrictions we'll 
be comfortable accepting.
   
   in this case, I DO expect we'd all accept the "single source, homogenous 
WUs" restriction.
   
   the other element is enforcement: to preclude deviations, impl code 
generally needs to actively look for and reject violations.  it's still 
defensive coding (never let that go!), but just situating it earlier in the 
system flow (gating entry, if you will, rather than when manipulating data 
that's already entered the system).
   
   since I'm not aware of where such enforcement lives in this case, I still 
prefer this defensive coding at point of use, but if you feel strongly, I'm 
open to reconsidering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to