phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578488823


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java:
##########
@@ -544,8 +544,7 @@ public static Optional<Class<? extends DataPublisher>> 
getJobDataPublisherClass(
    * or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
    */
   public static boolean shouldCommitDataInJob(State state) {
-    boolean jobCommitPolicyIsFull =
-        JobCommitPolicy.getCommitPolicy(state.getProperties()) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+    boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;

Review Comment:
   should be much cheaper than copying all state props merely to access a 
single one!



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -104,8 +104,7 @@ public int execute(Properties jobProps, 
EventSubmitterContext eventSubmitterCont
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
           e.getClass().toString(),
-          e,
-          null
+          e

Review Comment:
   resolve -
   ```
   
gobblin/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:108:
 warning: non-varargs call of varargs method with inexact argument type for 
last parameter;
             null
             ^
   ```
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->
+                // CRITICAL: although some `WorkUnit`s, like those created by 
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+                // already themselves contain every prop of their `JobState`, 
not all do.
+                // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
+                // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+                taskState.setJobState(jobState)

Review Comment:
   as in 
[gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java#L204).
  I added a TODO to consider whether we just want to call that method outright.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->
+                // CRITICAL: although some `WorkUnit`s, like those created by 
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+                // already themselves contain every prop of their `JobState`, 
not all do.
+                // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
+                // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+                taskState.setJobState(jobState)
+                // TODO - decide whether something akin necessary to 
streamline cumulative in-memory size of all issues: 
consumeTaskIssues(taskState);
+            ).collect(Collectors.toList())
+    ).orElseGet(() -> {
+      log.error("TaskStateStore successfully opened, but no task states found 
under (name) '{}'", jobIdPathName);
+      return Lists.newArrayList();
+    });
+  }
+
   /** @return id/correlator for this particular commit activity */
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
   }
-
-  /**
-   * Organize task states by dataset urns.
-   * @param taskStates
-   * @return A map of dataset urns to dataset task states.
-   */
-  public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
-    Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-
-    //TODO: handle skipped tasks?
-    for (TaskState taskState : taskStates) {
-      String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
-      datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
-      datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
-    }
-
-    return datasetStatesByUrns;
-  }
-
-  private static String createDatasetUrn(Map<String, JobState.DatasetState> 
datasetStatesByUrns, TaskState taskState) {
-    String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
-    if (!datasetStatesByUrns.containsKey(datasetUrn)) {
-      JobState.DatasetState datasetState = new JobState.DatasetState();

Review Comment:
   this no-arg ctor should possibly only be used for serialization.
   
   w/o a `jobName`, this results later while persisting task states at the end 
of commit:
   ```
   Caused by: java.lang.IllegalArgumentException: Can not create a Path from a 
null string
        at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
        at org.apache.hadoop.fs.Path.<init>(Path.java:175)
        at org.apache.hadoop.fs.Path.<init>(Path.java:110)
        at 
org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
        at 
org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
        at 
org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
        at 
org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
        at 
org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
        ... 8 more
   ```
   iceberg-distcp succeeded because it happened to have this set:
   ```
   state.store.enabled=false
   ```
   
([gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java#L725)
 solution we now leverage) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to