This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a4290426d Add options to specify read and write http timeout for gcs 
as well as lower batching limit for rewrite operations which are copying. 
(#31410)
49a4290426d is described below

commit 49a4290426d5cfad56b5d9977f26517de5036885
Author: Sam Whittle <[email protected]>
AuthorDate: Wed May 29 15:37:34 2024 +0200

    Add options to specify read and write http timeout for gcs as well as lower 
batching limit for rewrite operations which are copying. (#31410)
---
 .../sdk/extensions/gcp/options/GcsOptions.java     |  18 +++
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      | 142 +++++++++++++--------
 .../gcp/util/RetryHttpRequestInitializer.java      |  15 ++-
 .../beam/sdk/extensions/gcp/util/Transport.java    |  41 +++---
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  |  83 +++++++++---
 5 files changed, 211 insertions(+), 88 deletions(-)

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 3b2461dcb0e..175d8f58de4 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -124,6 +124,24 @@ public interface GcsOptions extends 
ApplicationNameOptions, GcpOptions, Pipeline
 
   void setGcsPerformanceMetrics(Boolean reportPerformanceMetrics);
 
+  @Description("Read timeout for gcs http requests")
+  @Nullable
+  Integer getGcsHttpRequestReadTimeout();
+
+  void setGcsHttpRequestReadTimeout(@Nullable Integer timeoutMs);
+
+  @Description("Write timeout for gcs http requests.")
+  @Nullable
+  Integer getGcsHttpRequestWriteTimeout();
+
+  void setGcsHttpRequestWriteTimeout(@Nullable Integer timeoutMs);
+
+  @Description("Batching limit for rewrite ops which will copy data.")
+  @Nullable
+  Integer getGcsRewriteDataOpBatchLimit();
+
+  void setGcsRewriteDataOpBatchLimit(@Nullable Integer timeoutMs);
+
   /**
    * Returns the default {@link ExecutorService} to use within the Apache Beam 
SDK. The {@link
    * ExecutorService} is compatible with AppEngine.
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 60e8443d264..9e790002ecd 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -86,6 +86,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.MoreFutures;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -123,7 +124,8 @@ public class GcsUtil {
           gcsOptions.getExecutorService(),
           hasExperiment(options, "use_grpc_for_gcs"),
           gcsOptions.getGcpCredential(),
-          gcsOptions.getGcsUploadBufferSizeBytes());
+          gcsOptions.getGcsUploadBufferSizeBytes(),
+          gcsOptions.getGcsRewriteDataOpBatchLimit());
     }
 
     /** Returns an instance of {@link GcsUtil} based on the given parameters. 
*/
@@ -140,7 +142,8 @@ public class GcsUtil {
           executorService,
           hasExperiment(options, "use_grpc_for_gcs"),
           credentials,
-          uploadBufferSizeBytes);
+          uploadBufferSizeBytes,
+          null);
     }
   }
 
@@ -154,6 +157,8 @@ public class GcsUtil {
 
   /** Maximum number of requests permitted in a GCS batch request. */
   private static final int MAX_REQUESTS_PER_BATCH = 100;
+  /** Default maximum number of requests permitted in a GCS batch request 
where data is copied. */
+  private static final int MAX_REQUESTS_PER_COPY_BATCH = 10;
   /** Maximum number of concurrent batches of requests executing on GCS. */
   private static final int MAX_CONCURRENT_BATCHES = 256;
 
@@ -179,11 +184,13 @@ public class GcsUtil {
   // Exposed for testing.
   final ExecutorService executorService;
 
-  private Credentials credentials;
+  private final Credentials credentials;
 
   private GoogleCloudStorage googleCloudStorage;
   private GoogleCloudStorageOptions googleCloudStorageOptions;
 
+  private final int rewriteDataOpBatchLimit;
+
   /** Rewrite operation setting. For testing purposes only. */
   @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall;
 
@@ -208,7 +215,8 @@ public class GcsUtil {
       ExecutorService executorService,
       Boolean shouldUseGrpc,
       Credentials credentials,
-      @Nullable Integer uploadBufferSizeBytes) {
+      @Nullable Integer uploadBufferSizeBytes,
+      @Nullable Integer rewriteDataOpBatchLimit) {
     this.storageClient = storageClient;
     this.httpRequestInitializer = httpRequestInitializer;
     this.uploadBufferSizeBytes = uploadBufferSizeBytes;
@@ -249,6 +257,8 @@ public class GcsUtil {
             }
           };
         };
+    this.rewriteDataOpBatchLimit =
+        rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH : 
rewriteDataOpBatchLimit;
   }
 
   // Use this only for testing purposes.
@@ -814,17 +824,27 @@ public class GcsUtil {
     }
 
     try {
-      MoreFutures.get(MoreFutures.allAsList(futures));
+      try {
+        MoreFutures.get(MoreFutures.allAsList(futures));
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof FileNotFoundException) {
+          throw (FileNotFoundException) e.getCause();
+        }
+        throw new IOException("Error executing batch GCS request", e);
+      } finally {
+        // Give the other batches a chance to complete in error cases.
+        executor.shutdown();
+        if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+          LOG.warn("Taking over 5 minutes to flush gcs op batches after 
error");
+          executor.shutdownNow();
+          if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+            LOG.warn("Took over 10 minutes to flush gcs op batches after error 
and interruption.");
+          }
+        }
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new IOException("Interrupted while executing batch GCS request", 
e);
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof FileNotFoundException) {
-        throw (FileNotFoundException) e.getCause();
-      }
-      throw new IOException("Error executing batch GCS request", e);
-    } finally {
-      executor.shutdown();
     }
   }
 
@@ -865,14 +885,14 @@ public class GcsUtil {
     private final boolean ignoreMissingSource;
     private boolean readyToEnqueue;
     private boolean performDelete;
-    private GoogleJsonError lastError;
+    private @Nullable GoogleJsonError lastError;
     @VisibleForTesting Storage.Objects.Rewrite rewriteRequest;
 
     public boolean getReadyToEnqueue() {
       return readyToEnqueue;
     }
 
-    public GoogleJsonError getLastError() {
+    public @Nullable GoogleJsonError getLastError() {
       return lastError;
     }
 
@@ -884,6 +904,10 @@ public class GcsUtil {
       return to;
     }
 
+    public boolean isMetadataOperation() {
+      return performDelete || from.getBucket().equals(to.getBucket());
+    }
+
     public void enqueue(BatchInterface batch) throws IOException {
       if (!readyToEnqueue) {
         throw new IOException(
@@ -891,38 +915,38 @@ public class GcsUtil {
                 "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",
                 from, to, readyToEnqueue));
       }
-      if (performDelete) {
-        Storage.Objects.Delete deleteRequest =
-            storageClient.objects().delete(from.getBucket(), from.getObject());
-        batch.queue(
-            deleteRequest,
-            new JsonBatchCallback<Void>() {
-              @Override
-              public void onSuccess(Void obj, HttpHeaders responseHeaders) {
-                LOG.debug("Successfully deleted {} after moving to {}", from, 
to);
+      if (!performDelete) {
+        batch.queue(rewriteRequest, this);
+        return;
+      }
+      Storage.Objects.Delete deleteRequest =
+          storageClient.objects().delete(from.getBucket(), from.getObject());
+      batch.queue(
+          deleteRequest,
+          new JsonBatchCallback<Void>() {
+            @Override
+            public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+              LOG.debug("Successfully deleted {} after moving to {}", from, 
to);
+              readyToEnqueue = false;
+              lastError = null;
+            }
+
+            @Override
+            public void onFailure(GoogleJsonError e, HttpHeaders 
responseHeaders)
+                throws IOException {
+              if (e.getCode() == 404) {
+                LOG.info(
+                    "Ignoring failed deletion of moved file {} which already 
does not exist: {}",
+                    from,
+                    e);
                 readyToEnqueue = false;
                 lastError = null;
+              } else {
+                readyToEnqueue = true;
+                lastError = e;
               }
-
-              @Override
-              public void onFailure(GoogleJsonError e, HttpHeaders 
responseHeaders)
-                  throws IOException {
-                if (e.getCode() == 404) {
-                  LOG.info(
-                      "Ignoring failed deletion of moved file {} which already 
does not exist: {}",
-                      from,
-                      e);
-                  readyToEnqueue = false;
-                  lastError = null;
-                } else {
-                  readyToEnqueue = true;
-                  lastError = e;
-                }
-              }
-            });
-      } else {
-        batch.queue(rewriteRequest, this);
-      }
+            }
+          });
     }
 
     public RewriteOp(GcsPath from, GcsPath to, boolean deleteSource, boolean 
ignoreMissingSource)
@@ -1055,6 +1079,7 @@ public class GcsUtil {
       if (batches.isEmpty()) {
         break;
       }
+      Preconditions.checkState(!rewrites.isEmpty());
       RewriteOp sampleErrorOp =
           rewrites.stream().filter(op -> op.getLastError() != 
null).findFirst().orElse(null);
       if (sampleErrorOp != null) {
@@ -1117,23 +1142,38 @@ public class GcsUtil {
 
   List<BatchInterface> makeRewriteBatches(LinkedList<RewriteOp> rewrites) 
throws IOException {
     List<BatchInterface> batches = new ArrayList<>();
-    BatchInterface batch = batchRequestSupplier.get();
+    @Nullable BatchInterface opBatch = null;
+    boolean useSeparateRewriteDataBatch = this.rewriteDataOpBatchLimit != 
MAX_REQUESTS_PER_BATCH;
     Iterator<RewriteOp> it = rewrites.iterator();
+    List<RewriteOp> deferredRewriteDataOps = new ArrayList<>();
     while (it.hasNext()) {
       RewriteOp rewrite = it.next();
       if (!rewrite.getReadyToEnqueue()) {
         it.remove();
         continue;
       }
-      rewrite.enqueue(batch);
-
-      if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
-        batches.add(batch);
-        batch = batchRequestSupplier.get();
+      if (useSeparateRewriteDataBatch && !rewrite.isMetadataOperation()) {
+        deferredRewriteDataOps.add(rewrite);
+      } else {
+        if (opBatch != null && opBatch.size() >= MAX_REQUESTS_PER_BATCH) {
+          opBatch = null;
+        }
+        if (opBatch == null) {
+          opBatch = batchRequestSupplier.get();
+          batches.add(opBatch);
+        }
+        rewrite.enqueue(opBatch);
       }
     }
-    if (batch.size() > 0) {
-      batches.add(batch);
+    for (RewriteOp rewrite : deferredRewriteDataOps) {
+      if (opBatch != null && opBatch.size() >= this.rewriteDataOpBatchLimit) {
+        opBatch = null;
+      }
+      if (opBatch == null) {
+        opBatch = batchRequestSupplier.get();
+        batches.add(opBatch);
+      }
+      rewrite.enqueue(opBatch);
     }
     return batches;
   }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
index cfc925a515e..d053a5f4bf8 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
@@ -59,10 +59,11 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
               307 /* Redirect, handled by the client library */,
               308 /* Resume Incomplete, handled by the client library */));
 
-  /** Http response timeout to use for hanging gets. */
+  /** Default http response timeout to use for hanging gets. */
   private static final int HANGING_GET_TIMEOUT_SEC = 80;
 
   private int writeTimeout;
+  private int readTimeout;
 
   /** Handlers used to provide additional logging information on unsuccessful 
HTTP requests. */
   private static class LoggingHttpBackOffHandler
@@ -249,13 +250,14 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
     this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
     this.responseInterceptor = responseInterceptor;
     this.writeTimeout = 0;
+    // Set a timeout for hanging-gets.
+    // TODO: Do this exclusively for work requests.
+    this.readTimeout = HANGING_GET_TIMEOUT_SEC * 1000;
   }
 
   @Override
   public void initialize(HttpRequest request) throws IOException {
-    // Set a timeout for hanging-gets.
-    // TODO: Do this exclusively for work requests.
-    request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
+    request.setReadTimeout(this.readTimeout);
     request.setWriteTimeout(this.writeTimeout);
 
     LoggingHttpBackOffHandler loggingHttpBackOffHandler =
@@ -295,4 +297,9 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
   public void setHttpHeaders(Map<String, String> httpHeaders) {
     this.httpHeaders = httpHeaders;
   }
+
+  /** @param readTimeout in milliseconds. */
+  public void setReadTimeout(int readTimeout) {
+    this.readTimeout = readTimeout;
+  }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
index f7ecbfeda77..ac6d825e125 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
@@ -34,6 +34,7 @@ import java.net.URL;
 import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.util.Optional;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.util.ReleaseInfo;
@@ -94,8 +95,6 @@ public class Transport {
 
   /** Returns a Cloud Storage client builder using the specified {@link 
GcsOptions}. */
   public static Storage.Builder newStorageClient(GcsOptions options) {
-    String jobName = 
Optional.ofNullable(options.getJobName()).orElse("UNKNOWN");
-
     String applicationName =
         String.format(
             "%sapache-beam/%s (GPN:Beam)",
@@ -104,20 +103,9 @@ public class Transport {
 
     String servicePath = options.getGcsEndpoint();
 
-    // Do not log the code 404. Code up the stack will deal with 404's if 
needed,
-    // and logging it by default clutters the output during file staging.
-    RetryHttpRequestInitializer retryHttpRequestInitializer =
-        new RetryHttpRequestInitializer(ImmutableList.of(404), new 
UploadIdResponseInterceptor());
-
-    // Set custom audit info in request headers
-    
retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job",
 jobName));
-
     Storage.Builder storageBuilder =
         new Storage.Builder(
-                getTransport(),
-                getJsonFactory(),
-                chainHttpRequestInitializer(
-                    options.getGcpCredential(), retryHttpRequestInitializer))
+                getTransport(), getJsonFactory(), 
httpRequestInitializerFromOptions(options))
             .setApplicationName(applicationName)
             .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
     if (servicePath != null) {
@@ -129,14 +117,31 @@ public class Transport {
     return storageBuilder;
   }
 
-  private static HttpRequestInitializer chainHttpRequestInitializer(
-      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+  private static HttpRequestInitializer 
httpRequestInitializerFromOptions(GcsOptions options) {
+    // Do not log the code 404. Code up the stack will deal with 404's if 
needed,
+    // and logging it by default clutters the output during file staging.
+    RetryHttpRequestInitializer retryHttpRequestInitializer =
+        new RetryHttpRequestInitializer(ImmutableList.of(404), new 
UploadIdResponseInterceptor());
+
+    // Set custom audit info in request headers
+    String jobName = 
Optional.ofNullable(options.getJobName()).orElse("UNKNOWN");
+    
retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job",
 jobName));
+
+    @Nullable Integer readTimeout = options.getGcsHttpRequestReadTimeout();
+    if (readTimeout != null) {
+      retryHttpRequestInitializer.setReadTimeout(readTimeout);
+    }
+    @Nullable Integer writeTimeout = options.getGcsHttpRequestWriteTimeout();
+    if (writeTimeout != null) {
+      retryHttpRequestInitializer.setWriteTimeout(writeTimeout);
+    }
+    Credentials credential = options.getGcpCredential();
     if (credential == null) {
       return new ChainingHttpRequestInitializer(
-          new NullCredentialInitializer(), httpRequestInitializer);
+          new NullCredentialInitializer(), retryHttpRequestInitializer);
     } else {
       return new ChainingHttpRequestInitializer(
-          new HttpCredentialsAdapter(credential), httpRequestInitializer);
+          new HttpCredentialsAdapter(credential), retryHttpRequestInitializer);
     }
   }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 8b311abcb51..d22496fca58 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -70,6 +70,7 @@ import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InvalidObjectException;
 import java.math.BigInteger;
 import java.net.SocketTimeoutException;
 import java.nio.channels.SeekableByteChannel;
@@ -1069,9 +1070,13 @@ public class GcsUtilTest {
   }
 
   private static List<String> makeStrings(String s, int n) {
+    return makeStrings("bucket", s, n);
+  }
+
+  private static List<String> makeStrings(String bucket, String s, int n) {
     ImmutableList.Builder<String> ret = ImmutableList.builder();
     for (int i = 0; i < n; ++i) {
-      ret.add(String.format("gs://bucket/%s%d", s, i));
+      ret.add(String.format("gs://%s/%s%d", bucket, s, i));
     }
     return ret.build();
   }
@@ -1156,6 +1161,48 @@ public class GcsUtilTest {
     assertThat(sumBatchSizes(batches), equalTo(501));
   }
 
+  @Test
+  public void testMakeRewriteBatchesWithLowerDataOpLimit() throws IOException {
+    GcsOptions options = gcsOptionsWithTestCredential();
+    options.setGcsRewriteDataOpBatchLimit(2);
+    GcsUtil gcsUtil = options.getGcsUtil();
+
+    // Small number of files in same bucket fits in 1 batch
+    List<BatchInterface> batches =
+        gcsUtil.makeRewriteBatches(
+            gcsUtil.makeRewriteOps(makeStrings("s", 5), makeStrings("d", 5), 
false, false, false));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(5));
+
+    // Files copying between buckets use smaller batch size
+    batches =
+        gcsUtil.makeRewriteBatches(
+            gcsUtil.makeRewriteOps(
+                makeStrings("bucket1", "s", 5),
+                makeStrings("bucket2", "d", 5),
+                false,
+                false,
+                false));
+    assertThat(batches.size(), equalTo(3));
+    assertThat(sumBatchSizes(batches), equalTo(5));
+
+    // A mix of same bucket and different buckets uses large batches when 
possible.
+    List<String> fromFiles = new ArrayList<>(makeStrings("bucket1", "s", 3));
+    List<String> toFiles = new ArrayList<>(makeStrings("bucket2", "d", 3));
+    fromFiles.addAll(makeStrings("t", 90));
+    toFiles.addAll(makeStrings("e", 90));
+    fromFiles.addAll(makeStrings("bucket3", "u", 3));
+    toFiles.addAll(makeStrings("bucket4", "f", 3));
+    fromFiles.addAll(makeStrings("bucket5", "v", 1));
+    toFiles.addAll(makeStrings("bucket5", "g", 1));
+
+    batches =
+        gcsUtil.makeRewriteBatches(gcsUtil.makeRewriteOps(fromFiles, toFiles, 
false, false, false));
+    assertThat(batches.size(), equalTo(4));
+    assertThat(batches.get(0).size(), equalTo(91));
+    assertThat(sumBatchSizes(batches), equalTo(97));
+  }
+
   @Test
   public void testMakeRewriteOpsInvalid() throws IOException {
     GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
@@ -1184,6 +1231,9 @@ public class GcsUtilTest {
                 cb.onFailure(error, null);
               } catch (GoogleJsonResponseException e) {
                 cb.onFailure(e.getDetails(), null);
+              } catch (SocketTimeoutException e) {
+                System.out.println("Propagating socket exception as batch 
processing error");
+                throw e;
               } catch (Exception e) {
                 System.out.println("Propagating exception as server error " + 
e);
                 e.printStackTrace();
@@ -1226,7 +1276,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Rewrite mockStorageRewrite = 
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1237,13 +1287,13 @@ public class GcsUtilTest {
     when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
         .thenReturn(mockStorageRewrite);
     when(mockStorageRewrite.execute())
-        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenThrow(new InvalidObjectException("Test exception"))
         .thenReturn(new RewriteResponse().setDone(true));
     when(mockStorageObjects.delete("bucket", "s0"))
         .thenReturn(mockStorageDelete1)
         .thenReturn(mockStorageDelete2);
 
-    when(mockStorageDelete1.execute()).thenThrow(new 
SocketTimeoutException("SocketException"));
+    when(mockStorageDelete1.execute()).thenThrow(new 
InvalidObjectException("Test exception"));
 
     gcsUtil.rename(makeStrings("s", 1), makeStrings("d", 1));
     verify(mockStorageRewrite, times(2)).execute();
@@ -1258,7 +1308,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Rewrite mockStorageRewrite1 = 
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1288,7 +1338,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Rewrite mockStorageRewrite = 
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1309,7 +1359,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Rewrite mockStorageRewrite = 
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1352,7 +1402,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Get mockGetRequest1 = 
Mockito.mock(Storage.Objects.Get.class);
@@ -1384,7 +1434,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Get mockGetRequest = 
Mockito.mock(Storage.Objects.Get.class);
@@ -1472,7 +1522,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Get mockGetRequest = 
Mockito.mock(Storage.Objects.Get.class);
@@ -1492,7 +1542,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Get mockGetRequest = 
Mockito.mock(Storage.Objects.Get.class);
@@ -1518,7 +1568,7 @@ public class GcsUtilTest {
 
     Storage mockStorage = Mockito.mock(Storage.class);
     gcsUtil.setStorageClient(mockStorage);
-    gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+    gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
 
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     when(mockStorage.objects()).thenReturn(mockStorageObjects);
@@ -1545,7 +1595,8 @@ public class GcsUtilTest {
           gcsOptions.getExecutorService(),
           hasExperiment(options, "use_grpc_for_gcs"),
           gcsOptions.getGcpCredential(),
-          gcsOptions.getGcsUploadBufferSizeBytes());
+          gcsOptions.getGcsUploadBufferSizeBytes(),
+          gcsOptions.getGcsRewriteDataOpBatchLimit());
     }
 
     private GcsUtilMock(
@@ -1554,14 +1605,16 @@ public class GcsUtilTest {
         ExecutorService executorService,
         Boolean shouldUseGrpc,
         Credentials credentials,
-        @Nullable Integer uploadBufferSizeBytes) {
+        @Nullable Integer uploadBufferSizeBytes,
+        @Nullable Integer rewriteDataOpBatchLimit) {
       super(
           storageClient,
           httpRequestInitializer,
           executorService,
           shouldUseGrpc,
           credentials,
-          uploadBufferSizeBytes);
+          uploadBufferSizeBytes,
+          rewriteDataOpBatchLimit);
     }
 
     @Override

Reply via email to