sidseth commented on a change in pull request #3597:
URL: https://github.com/apache/hadoop/pull/3597#discussion_r740720279
##########
File path:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -497,6 +535,302 @@ private void mergePaths(FileSystem fs, final FileStatus
from,
}
}
+ @VisibleForTesting
+ public int getMoveThreads() {
+ return moveThreads;
+ }
+
+ public boolean isParallelMoveEnabled() {
+ // Only available for algo v1
+ return (moveThreads > 1 && algorithmVersion == 1);
+ }
+
+ void validateParallelMove() throws IOException {
+ if (!isParallelMoveEnabled()) {
+ throw new IOException("Parallel file move is not enabled. "
+ + "moveThreads=" + moveThreads
+ + ", algo=" + algorithmVersion);
+ }
+ }
+
+ void validateThreadPool(ExecutorService pool, BlockingQueue<Future<Void>>
futures)
+ throws IOException {
+ boolean threadPoolEnabled = isParallelMoveEnabled();
+ if (!threadPoolEnabled || pool == null || futures == null) {
+ String errorMsg = "Thread pool is not configured correctly. "
+ + "threadPoolEnabled: " + threadPoolEnabled
+ + ", pool: " + pool
+ + ", futures: " + futures;
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+
+ /**
+ * Get executor service for moving files for v1 algorithm.
+ * @return executor service
+ * @throws IOException on error
+ */
+ private ExecutorService createExecutorService() throws IOException {
+ // intentional validation
+ validateParallelMove();
+
+ ExecutorService pool = new ThreadPoolExecutor(moveThreads, moveThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("FileCommitter-v1-move-thread-%d")
+ .build(),
+ new ThreadPoolExecutor.CallerRunsPolicy()
+ );
+ LOG.info("Size of move thread pool: {}, pool: {}", moveThreads, pool);
+ return pool;
+ }
+
+ private void parallelCommitJobInternal(JobContext context) throws
IOException {
+ // validate to be on safer side.
+ validateParallelMove();
+
+ if (hasOutputPath()) {
+ Path finalOutput = getOutputPath();
+ FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
+ // created resilient commit helper bonded to the destination FS/path
+ resilientCommitHelper = new ResilientCommitByRenameHelper(fs);
Review comment:
This mechanism becomes very FileSystem specific. Implemented by Azure
right now.
Other users of rename will not see the benefits without changing interfaces,
which in turn requires shimming etc.
Would it be better for AzureFileSystem rename itself to add a config
parameter which can lookup the src etag (at the cost of a performance hit for
consistency), so that downstream components / any users of the rename operation
can benefit from this change without having to change interfaces. Also, if the
performance penalty is a big problem - Abfs could create very short-lived
caches for FileStatus objects, and handle errors on discrepancies with the
cached copy.
Essentially - don't force usage of the new interface to get the benefits.
Side note: The fs.getStatus within ResilientCommitByRenameHelper for
FileSystems where this new functionality is not supported will lead to a
performance penalty for the other FileSystems (performing a getFileStatus on
src).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]