shunping commented on code in PR #37592:
URL: https://github.com/apache/beam/pull/37592#discussion_r2823233666
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -259,6 +273,144 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws
IOException {
return results;
}
+ public enum MissingStrategy {
+ FAIL_IF_MISSING,
+ SKIP_IF_MISSING,
+ }
+
+ public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws
IOException {
+ for (List<GcsPath> pathPartition :
+ Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+
+ // Create a new empty batch every time
+ StorageBatch batch = storage.batch();
+ List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
+
+ for (GcsPath path : pathPartition) {
+ batchResultFutures.add(batch.delete(path.getBucket(),
path.getObject()));
+ }
+ batch.submit();
+
+ for (int i = 0; i < batchResultFutures.size(); i++) {
+ StorageBatchResult<Boolean> future = batchResultFutures.get(i);
+ try {
+ Boolean deleted = future.get();
+ if (!deleted) {
+ if (strategy == MissingStrategy.FAIL_IF_MISSING) {
+ throw new FileNotFoundException(
+ String.format(
+ "The specified file does not exist: %s",
pathPartition.get(i).toString()));
+ } else {
+ LOG.warn("Ignoring failed deletion on file {}.",
pathPartition.get(i).toString());
+ }
+ }
+ } catch (StorageException e) {
+ throw translateStorageException(pathPartition.get(i), e);
+ }
+ }
+ }
+ }
+
+ public enum OverwriteStrategy {
+ FAIL_IF_EXISTS, // Fail if target exists
+ SKIP_IF_EXISTS, // Skip if target exists
+ SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
+ ALWAYS_OVERWRITE // Overwrite regardless of state
+ }
+
+ private void rewriteHelper(
+ Iterable<GcsPath> srcPaths,
+ Iterable<GcsPath> dstPaths,
+ boolean deleteSrc,
+ MissingStrategy srcMissing,
+ OverwriteStrategy dstOverwrite)
+ throws IOException {
+ List<GcsPath> srcList = Lists.newArrayList(srcPaths);
+ List<GcsPath> dstList = Lists.newArrayList(dstPaths);
+ checkArgument(
+ srcList.size() == dstList.size(),
+ "Number of source files %s must equal number of destination files %s",
+ srcList.size(),
+ dstList.size());
+
+ for (int i = 0; i < srcList.size(); i++) {
+ GcsPath srcPath = srcList.get(i);
+ GcsPath dstPath = dstList.get(i);
+ BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
+ BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
+
+ CopyRequest.Builder copyRequestBuilder =
+ CopyRequest.newBuilder()
+ .setSource(srcId)
+ .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
+
+ if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
+ copyRequestBuilder.setTarget(dstId);
+ } else {
+ // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking
the target blob
+ BlobInfo existingTarget;
+ try {
+ existingTarget = storage.get(dstId);
+ } catch (StorageException e) {
+ throw translateStorageException(dstPath, e);
+ }
+
+ if (existingTarget == null) {
+ copyRequestBuilder.setTarget(dstId,
Storage.BlobTargetOption.doesNotExist());
+ } else {
+ switch (dstOverwrite) {
+ case SKIP_IF_EXISTS:
+ LOG.warn("Ignoring rewriting from {} to {} because target
exists.", srcPath, dstPath);
+ continue; // Skip to next file in for-loop
+
+ case SAFE_OVERWRITE:
+ copyRequestBuilder.setTarget(
+ dstId,
Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
+ break;
+
+ case FAIL_IF_EXISTS:
+ default:
+ throw new FileAlreadyExistsException(
+ srcPath.toString(),
+ dstPath.toString(),
+ "Target object already exists and strategy is
FAIL_IF_EXISTS");
+ }
+ }
+ }
+
+ try {
+ CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
+ copyWriter.getResult();
+
+ if (deleteSrc) {
+ storage.get(srcId).delete();
Review Comment:
Fixed.
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -433,12 +438,69 @@ public void copy(Iterable<String> srcFilenames,
Iterable<String> destFilenames)
delegate.copy(srcFilenames, destFilenames);
}
+ /** experimental api. */
+ public void copyV2(Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths)
throws IOException {
+ copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE);
+ }
+
+ /** experimental api. */
+ public void copy(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths,
OverwriteStrategy strategy)
+ throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.copy(srcPaths, dstPaths, strategy);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
public void rename(
Iterable<String> srcFilenames, Iterable<String> destFilenames,
MoveOptions... moveOptions)
throws IOException {
delegate.rename(srcFilenames, destFilenames, moveOptions);
}
+ /** experimental api. */
+ public void renameV2(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, MoveOptions...
moveOptions)
+ throws IOException {
+ Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
+ final MissingStrategy srcMissing;
+ final OverwriteStrategy dstOverwrite;
+
+ if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) {
+ srcMissing = MissingStrategy.SKIP_IF_MISSING;
+ } else {
+ srcMissing = MissingStrategy.FAIL_IF_MISSING;
+ }
+
+ if
(moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) {
+ dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS;
+ } else {
+ dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE;
+ }
+
+ if (delegateV2 != null) {
+ delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]