[
https://issues.apache.org/jira/browse/BEAM-11494?focusedWorklogId=553408&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-553408
]
ASF GitHub Bot logged work on BEAM-11494:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Feb/21 06:34
Start Date: 17/Feb/21 06:34
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #13558:
URL: https://github.com/apache/beam/pull/13558#discussion_r577357032
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
##########
@@ -764,7 +764,11 @@ final void moveToOutputFiles(
}
// During a failure case, files may have been deleted in an earlier
step. Thus
// we ignore missing files here.
- FileSystems.rename(srcFiles, dstFiles,
StandardMoveOptions.IGNORE_MISSING_FILES);
+ FileSystems.rename(
+ srcFiles,
+ dstFiles,
+ StandardMoveOptions.IGNORE_MISSING_FILES,
+ StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
Review comment:
We've verified that this is the correct behavior for GCS. What about
other file systems?
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
##########
@@ -401,16 +412,40 @@ public ResourceId apply(@Nonnull Metadata input) {
List<ResourceId> srcToHandle = new ArrayList<>();
List<ResourceId> destToHandle = new ArrayList<>();
- List<MatchResult> matchResults = matchResources(srcResourceIds);
- for (int i = 0; i < matchResults.size(); ++i) {
- if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) {
- srcToHandle.add(srcResourceIds.get(i));
- destToHandle.add(destResourceIds.get(i));
+ List<MatchResult> matchSrcResults = matchResources(srcResourceIds);
+ List<MatchResult> matchDestResults = new ArrayList<>();
+ if (skipExistingDest) {
+ matchDestResults = matchResources(destResourceIds);
+ }
+
+ for (int i = 0; i < matchSrcResults.size(); ++i) {
+ if (matchSrcResults.get(i).status().equals(Status.NOT_FOUND) &&
ignoreMissingSrc) {
+ // If the source is not found, and we are ignoring found source files,
then we skip it.
+ continue;
}
+ if (skipExistingDest
+ && matchDestResults.get(i).status().equals(Status.OK)
+ && filesMatch(
+ matchDestResults.get(i).metadata().get(0),
+ matchSrcResults.get(i).metadata().get(0))) {
+ // If the destination exists, and we are skipping when destinations
exist, then we skip.
+ continue;
+ }
+ srcToHandle.add(srcResourceIds.get(i));
+ destToHandle.add(destResourceIds.get(i));
}
return KV.of(srcToHandle, destToHandle);
}
+ private static boolean filesMatch(MatchResult.Metadata first,
MatchResult.Metadata second) {
+ if (!first.checksum().isPresent() && !second.checksum().isPresent()) {
Review comment:
Should this be ||?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 553408)
Time Spent: 1.5h (was: 1h 20m)
> FileIO.Write overwrites destination files on retries
> ----------------------------------------------------
>
> Key: BEAM-11494
> URL: https://issues.apache.org/jira/browse/BEAM-11494
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: P2
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Users have reported cases of FileIO.Write becoming stuck or failing due to
> overwriting destination files.
> The failure/stuckness occurs because there are some file system buckets with
> strict retention policies that do not allow files to be deleted.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)