shameersss1 commented on code in PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#discussion_r1477795947
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java:
##########
@@ -248,6 +236,80 @@ private PendingSet innerCommitTask(
return pendingSet;
}
+ /**
+ * Loads pending commits from either memory or from the remote store (S3)
based on the config.
+ * @param context TaskAttemptContext
+ * @return All pending commit data for the given TaskAttemptContext
+ * @throws IOException
+ * if there is an error trying to read the commit data
+ */
+ protected PendingSet loadPendingCommits(TaskAttemptContext context) throws
IOException {
+ PendingSet pendingSet = new PendingSet();
+ if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+ // load from memory
+ List<SinglePendingCommit> pendingCommits =
loadPendingCommitsFromMemory(context);
+
+ for (SinglePendingCommit singleCommit : pendingCommits) {
+ // aggregate stats
+ pendingSet.getIOStatistics()
+ .aggregate(singleCommit.getIOStatistics());
+ // then clear so they aren't marshalled again.
+ singleCommit.getIOStatistics().clear();
+ }
+ pendingSet.setCommits(pendingCommits);
+ } else {
+ // Load from remote store
+ CommitOperations actions = getCommitOperations();
+ Path taskAttemptPath = getTaskAttemptPath(context);
+ try (CommitContext commitContext = initiateTaskOperation(context)) {
+ Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded =
+ actions.loadSinglePendingCommits(taskAttemptPath, true,
commitContext);
+ pendingSet = loaded.getKey();
+ List<Pair<LocatedFileStatus, IOException>> failures =
loaded.getValue();
+ if (!failures.isEmpty()) {
+ // At least one file failed to load
+ // revert all which did; report failure with first exception
+ LOG.error("At least one commit file could not be read: failing");
+ abortPendingUploads(commitContext, pendingSet.getCommits(), true);
+ throw failures.get(0).getValue();
+ }
+ }
+ }
+ return pendingSet;
+ }
+
+ private List<SinglePendingCommit>
loadPendingCommitsFromMemory(TaskAttemptContext context)
Review Comment:
ack.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]