[
https://issues.apache.org/jira/browse/BEAM-5036?focusedWorklogId=151588&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151588
]
ASF GitHub Bot logged work on BEAM-5036:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Oct/18 11:51
Start Date: 05/Oct/18 11:51
Worklog Time Spent: 10m
Work Description: timrobertson100 commented on a change in pull request
#6289: [BEAM-5036] Optimize the FileBasedSink WriteOperation.moveToOutput()
URL: https://github.com/apache/beam/pull/6289#discussion_r222978923
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
##########
@@ -322,8 +330,71 @@ public static void rename(
if (srcToRename.isEmpty()) {
return;
}
- getFileSystemInternal(srcToRename.iterator().next().getScheme())
- .rename(srcToRename, destToRename);
+
+ boolean replaceExisting =
+ options.contains(MoveOptions.StandardMoveOptions.REPLACE_EXISTING) ?
true : false;
+ rename(
+ getFileSystemInternal(srcToRename.iterator().next().getScheme()),
+ srcToRename,
+ destToRename,
+ replaceExisting);
+ }
+
+ /**
+ * Executes a rename of the src which all must exist using the provided
filesystem.
+ *
+ * <p>If replaceExisting is enabled and filesystem throws {code
FileAlreadyExistsException} then
+ * an attempt to delete the destination is made and the rename is retried.
Some filesystem
+ * implementations may apply this automatically without throwing.
+ *
+ * @param fileSystem The filesystem in use
+ * @param srcResourceIds The source resources to move
+ * @param destResourceIds The destinations for the sources to move to (must
be same length as
+ * srcResourceIds)
+ * @param replaceExisting If existing files in destination should be
overwritten
+ * @throws IOException If the rename could not be completed
+ */
+ @VisibleForTesting
+ static void rename(
+ FileSystem fileSystem,
+ List<ResourceId> srcResourceIds,
+ List<ResourceId> destResourceIds,
+ boolean replaceExisting)
+ throws IOException {
+ try {
+ fileSystem.rename(srcResourceIds, destResourceIds);
+ } catch (FileAlreadyExistsException e) {
Review comment:
Thanks @chamikaramj
> According to FileSystem.rename() we should be catching
java.io.FileNotFoundException instead of
java.nio.file.FileAlreadyExistsException.
I'm afraid this PR is all about the destination paths
(FileAlreadyExistsException) and nothing to do with the source
(FileNotFoundException).
FileSystem.rename() was in inadequately documented IMO as it only took into
account files no longer existing in the sources (e.g. to accommodate
failure/retry) and overlooked that implementations may also throw when the
targets exist too. To my knowledge HDFS is the only one that does (see my
recent commit on it where it now throws instead of swallowing and leaving
corrupt data).
I designed this so that any FS that threw a FileAlreadyExistsException could
be handled and the caller could opt to delete and retry or not.
Given the confusion my implementation has given, I now believe it sensible
to close this PR and I make HDFSFileSystem handle this internally. It will then
always overwrite existing files (like other FS do). I didn't want to do that
originally because it is not intuitive - Hadoop devs are used to jobs failing
when targets exist. The reality is though, that we need force that anyway for
the FileBasedSink.
@iemejia @chamikaramj - would you be ok with that approach please? Thank
you both for taking the time to be involved in this, and I am sorry that I have
not been able to provide an intuitive solution - the questions it's raised
convince me that it is a confusing patch so shouldn't be merged.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 151588)
Time Spent: 10h 40m (was: 10.5h)
> Optimize FileBasedSink's WriteOperation.moveToOutput()
> ------------------------------------------------------
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files
> Affects Versions: 2.5.0
> Reporter: Jozef Vilcek
> Assignee: Tim Robertson
> Priority: Major
> Fix For: 2.8.0
>
> Time Spent: 10h 40m
> Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by
> copy+delete. It would be better to use a rename() which can be much more
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)