[
https://issues.apache.org/jira/browse/GOBBLIN-2046?focusedWorklogId=915406&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-915406
]
ASF GitHub Bot logged work on GOBBLIN-2046:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Apr/24 21:31
Start Date: 18/Apr/24 21:31
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571406415
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
};
}
- protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
- return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
- .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
- .orElse(
- "<<not a CopyableFile("
- + getOptCopyEntityClass(workUnit, workUnitPath)
- .map(Class::getSimpleName)
- .orElse("<<not a CopyEntity!>>")
- + "): '" + workUnitPath + "'"
- );
+ protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit>
workUnits, String workUnitsPath, JobState jobState) {
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit,
workUnitsPath))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (fileSourcePaths.isEmpty()) {
+ // TODO - describe WorkUnits other than `CopyableFile`
+ return Optional.empty();
+ } else {
+ return Optional.of(getSourcePathsToLog(fileSourcePaths,
jobState)).map(pathsToLog ->
+ "for copying source files: "
+ + (pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "))
+ + pathsToLog
+ );
+ }
}
- protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
- return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit
workUnit, String workUnitPath) {
+ return getOptWorkUnitCopyEntityClass(workUnit,
workUnitPath).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+ copyableFile.getOrigin().getPath().toString()));
}
- protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
- return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
- log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
- if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
- String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
- if (serialization != null) {
- return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
- }
+ protected static Optional<CopyableFile> getOptCopyableFile(TaskState
taskState) {
+ return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, taskState));
+ }
+
+ protected static Optional<CopyableFile> getOptCopyableFile(Class<?>
copyEntityClass, State state) {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
}
- return Optional.empty();
- });
+ }
+ return Optional.empty();
+ }
+
+ protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit
workUnit, String workUnitPath) {
+ return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
+ protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState
taskState) {
+ return getOptCopyEntityClass(taskState, "taskState '" +
taskState.getTaskId() + "'");
}
Review Comment:
I definitely agree--great system design philosophy! basically define core
requirements that translate into predictable system invariants, against which
we reduce the conceptual surface area for maintainers (forever and after!).
clearly a big potential win! the trick is to identify what restrictions we'll
be comfortable accepting.
in this case, I DO expect we'd all accept the "single source, homogenous
WUs" restriction.
the other element is enforcement: to preclude deviations, impl code
generally needs to actively look for and reject violations. it's still
defensive coding (never let that go!), but just situating it earlier in the
system flow (gating entry, if you will, rather than when manipulating data
that's already entered the system).
since I'm not aware of where such enforcement lives in this case, I still
prefer this defensive coding at point of use, but if you feel strongly, I'm
open to reconsidering.
Issue Time Tracking
-------------------
Worklog Id: (was: 915406)
Time Spent: 1h 20m (was: 1h 10m)
> Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging not to presume
> `CopyEntity`
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-2046
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2046
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> there are many kinds of `WorkUnit`, beyond the `CopyEntity` produced by
> `CopySource`.
> as it remains helpful, continue to log `CopyEntity`-specific info, but do so
> w/o suggesting something is missing when the WU is not a `CopyEntity`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)