scwhittle commented on a change in pull request #15301:
URL: https://github.com/apache/beam/pull/15301#discussion_r686811469
##########
File path:
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -625,29 +625,76 @@ private static void executeBatches(List<BatchRequest>
batches) throws IOExceptio
* round of enqueue() and execute is required. Repeat until
getReadyToEnqueue() returns false.
*/
class RewriteOp extends JsonBatchCallback<RewriteResponse> {
- private GcsPath from;
- private GcsPath to;
+ private final GcsPath from;
+ private final GcsPath to;
+ private final boolean move;
private boolean readyToEnqueue;
+ private boolean performDelete;
+ private GoogleJsonError lastError;
@VisibleForTesting Storage.Objects.Rewrite rewriteRequest;
public boolean getReadyToEnqueue() {
return readyToEnqueue;
}
+ public GoogleJsonError getLastError() {
+ return lastError;
+ }
+
+ public GcsPath getFrom() {
+ return from;
+ }
+
+ public GcsPath getTo() {
+ return to;
+ }
+
public void enqueue(BatchRequest batch) throws IOException {
if (!readyToEnqueue) {
throw new IOException(
String.format(
"Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",
from, to, readyToEnqueue));
}
- rewriteRequest.queue(batch, this);
- readyToEnqueue = false;
+ LOG.info("XXX enqueue op");
+ if (performDelete) {
+ Storage.Objects.Delete deleteRequest =
+ storageClient.objects().delete(from.getBucket(), from.getObject());
+ deleteRequest.queue(
+ batch,
+ new JsonBatchCallback<Void>() {
+ @Override
+ public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully deleted {} after moving to {}", from,
to);
+ readyToEnqueue = false;
+ lastError = null;
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders
responseHeaders)
+ throws IOException {
+ if (e.getCode() == 404) {
+ LOG.info(
+ "Ignoring failed deletion of moved file {} which already
does not exist: {}",
+ from,
+ e);
+ readyToEnqueue = false;
+ lastError = null;
+ } else {
+ readyToEnqueue = true;
+ lastError = e;
+ }
+ }
+ });
+ } else {
+ rewriteRequest.queue(batch, this);
+ }
}
- public RewriteOp(GcsPath from, GcsPath to) throws IOException {
+ public RewriteOp(GcsPath from, GcsPath to, boolean move) throws
IOException {
Review comment:
renamed the option to make what it does clearer. It is convenient to
have a single op shared by copy and rename. Let me know if you have other
naming ideas.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]