reuvenlax commented on a change in pull request #13558:
URL: https://github.com/apache/beam/pull/13558#discussion_r577357032
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
##########
@@ -764,7 +764,11 @@ final void moveToOutputFiles(
}
// During a failure case, files may have been deleted in an earlier
step. Thus
// we ignore missing files here.
- FileSystems.rename(srcFiles, dstFiles,
StandardMoveOptions.IGNORE_MISSING_FILES);
+ FileSystems.rename(
+ srcFiles,
+ dstFiles,
+ StandardMoveOptions.IGNORE_MISSING_FILES,
+ StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
Review comment:
We've verified that this is the correct behavior for GCS. What about
other file systems?
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
##########
@@ -401,16 +412,40 @@ public ResourceId apply(@Nonnull Metadata input) {
List<ResourceId> srcToHandle = new ArrayList<>();
List<ResourceId> destToHandle = new ArrayList<>();
- List<MatchResult> matchResults = matchResources(srcResourceIds);
- for (int i = 0; i < matchResults.size(); ++i) {
- if (!matchResults.get(i).status().equals(Status.NOT_FOUND)) {
- srcToHandle.add(srcResourceIds.get(i));
- destToHandle.add(destResourceIds.get(i));
+ List<MatchResult> matchSrcResults = matchResources(srcResourceIds);
+ List<MatchResult> matchDestResults = new ArrayList<>();
+ if (skipExistingDest) {
+ matchDestResults = matchResources(destResourceIds);
+ }
+
+ for (int i = 0; i < matchSrcResults.size(); ++i) {
+ if (matchSrcResults.get(i).status().equals(Status.NOT_FOUND) &&
ignoreMissingSrc) {
+ // If the source is not found, and we are ignoring found source files,
then we skip it.
+ continue;
}
+ if (skipExistingDest
+ && matchDestResults.get(i).status().equals(Status.OK)
+ && filesMatch(
+ matchDestResults.get(i).metadata().get(0),
+ matchSrcResults.get(i).metadata().get(0))) {
+ // If the destination exists, and we are skipping when destinations
exist, then we skip.
+ continue;
+ }
+ srcToHandle.add(srcResourceIds.get(i));
+ destToHandle.add(destResourceIds.get(i));
}
return KV.of(srcToHandle, destToHandle);
}
+ private static boolean filesMatch(MatchResult.Metadata first,
MatchResult.Metadata second) {
+ if (!first.checksum().isPresent() && !second.checksum().isPresent()) {
Review comment:
Should this be ||?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]