pratapaditya04 commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1880729655


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -205,7 +215,7 @@ private Map<String, DatasetStats> 
summarizeDatasetOutcomes(Map<String, JobState.
     // Only process successful datasets unless configuration to process failed 
datasets is set
     for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
       if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
-          && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+          && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || 
commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {

Review Comment:
   good suggestion, added a field allowPartialCommit



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) {
       Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), 
Lists.newArrayList());
       TaskState firstTaskState = taskStates.get(0);
       log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
-      commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      CommitStats commitStats = CommitStats.createEmpty();
+      try {
+        commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      } catch (FailedDatasetUrnsException exception) {
+        log.info("Some datasets failed to be committed, proceeding with 
publishing commit step");
+        commitStats.setOptFailure(Optional.of(exception));
+      }
 
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
 
       Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
-      return new CommitStats(datasetTaskSummaries, 
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+      return commitStats.setDatasetStats(datasetTaskSummaries)
+          .setNumCommittedWorkUnits(
+              
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to