pratapaditya04 commented on code in PR #4078: URL: https://github.com/apache/gobblin/pull/4078#discussion_r1880729655
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java: ########## @@ -205,7 +215,7 @@ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState. // Only process successful datasets unless configuration to process failed datasets is set for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) { if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED - && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) { + && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) { Review Comment: good suggestion, added a field allowPartialCommit ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java: ########## @@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) { Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList()); TaskState firstTaskState = taskStates.get(0); log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true)); - commitTaskStates(jobState, datasetStatesByUrns, jobContext); + CommitStats commitStats = CommitStats.createEmpty(); + try { + commitTaskStates(jobState, datasetStatesByUrns, jobContext); + } catch (FailedDatasetUrnsException exception) { + log.info("Some datasets failed to be committed, proceeding with publishing commit step"); + commitStats.setOptFailure(Optional.of(exception)); + } boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"); Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks); - return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum()); + return commitStats.setDatasetStats(datasetTaskSummaries) + .setNumCommittedWorkUnits( + datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum()); Review Comment: addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org