This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 49a4290426d Add options to specify read and write http timeout for gcs
as well as lower batching limit for rewrite operations which are copying.
(#31410)
49a4290426d is described below
commit 49a4290426d5cfad56b5d9977f26517de5036885
Author: Sam Whittle <[email protected]>
AuthorDate: Wed May 29 15:37:34 2024 +0200
Add options to specify read and write http timeout for gcs as well as lower
batching limit for rewrite operations which are copying. (#31410)
---
.../sdk/extensions/gcp/options/GcsOptions.java | 18 +++
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 142 +++++++++++++--------
.../gcp/util/RetryHttpRequestInitializer.java | 15 ++-
.../beam/sdk/extensions/gcp/util/Transport.java | 41 +++---
.../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 83 +++++++++---
5 files changed, 211 insertions(+), 88 deletions(-)
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 3b2461dcb0e..175d8f58de4 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -124,6 +124,24 @@ public interface GcsOptions extends
ApplicationNameOptions, GcpOptions, Pipeline
void setGcsPerformanceMetrics(Boolean reportPerformanceMetrics);
+ @Description("Read timeout for gcs http requests")
+ @Nullable
+ Integer getGcsHttpRequestReadTimeout();
+
+ void setGcsHttpRequestReadTimeout(@Nullable Integer timeoutMs);
+
+ @Description("Write timeout for gcs http requests.")
+ @Nullable
+ Integer getGcsHttpRequestWriteTimeout();
+
+ void setGcsHttpRequestWriteTimeout(@Nullable Integer timeoutMs);
+
+ @Description("Batching limit for rewrite ops which will copy data.")
+ @Nullable
+ Integer getGcsRewriteDataOpBatchLimit();
+
+ void setGcsRewriteDataOpBatchLimit(@Nullable Integer timeoutMs);
+
/**
* Returns the default {@link ExecutorService} to use within the Apache Beam
SDK. The {@link
* ExecutorService} is compatible with AppEngine.
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 60e8443d264..9e790002ecd 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -86,6 +86,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MoreFutures;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -123,7 +124,8 @@ public class GcsUtil {
gcsOptions.getExecutorService(),
hasExperiment(options, "use_grpc_for_gcs"),
gcsOptions.getGcpCredential(),
- gcsOptions.getGcsUploadBufferSizeBytes());
+ gcsOptions.getGcsUploadBufferSizeBytes(),
+ gcsOptions.getGcsRewriteDataOpBatchLimit());
}
/** Returns an instance of {@link GcsUtil} based on the given parameters.
*/
@@ -140,7 +142,8 @@ public class GcsUtil {
executorService,
hasExperiment(options, "use_grpc_for_gcs"),
credentials,
- uploadBufferSizeBytes);
+ uploadBufferSizeBytes,
+ null);
}
}
@@ -154,6 +157,8 @@ public class GcsUtil {
/** Maximum number of requests permitted in a GCS batch request. */
private static final int MAX_REQUESTS_PER_BATCH = 100;
+ /** Default maximum number of requests permitted in a GCS batch request
where data is copied. */
+ private static final int MAX_REQUESTS_PER_COPY_BATCH = 10;
/** Maximum number of concurrent batches of requests executing on GCS. */
private static final int MAX_CONCURRENT_BATCHES = 256;
@@ -179,11 +184,13 @@ public class GcsUtil {
// Exposed for testing.
final ExecutorService executorService;
- private Credentials credentials;
+ private final Credentials credentials;
private GoogleCloudStorage googleCloudStorage;
private GoogleCloudStorageOptions googleCloudStorageOptions;
+ private final int rewriteDataOpBatchLimit;
+
/** Rewrite operation setting. For testing purposes only. */
@VisibleForTesting @Nullable Long maxBytesRewrittenPerCall;
@@ -208,7 +215,8 @@ public class GcsUtil {
ExecutorService executorService,
Boolean shouldUseGrpc,
Credentials credentials,
- @Nullable Integer uploadBufferSizeBytes) {
+ @Nullable Integer uploadBufferSizeBytes,
+ @Nullable Integer rewriteDataOpBatchLimit) {
this.storageClient = storageClient;
this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
@@ -249,6 +257,8 @@ public class GcsUtil {
}
};
};
+ this.rewriteDataOpBatchLimit =
+ rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH :
rewriteDataOpBatchLimit;
}
// Use this only for testing purposes.
@@ -814,17 +824,27 @@ public class GcsUtil {
}
try {
- MoreFutures.get(MoreFutures.allAsList(futures));
+ try {
+ MoreFutures.get(MoreFutures.allAsList(futures));
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof FileNotFoundException) {
+ throw (FileNotFoundException) e.getCause();
+ }
+ throw new IOException("Error executing batch GCS request", e);
+ } finally {
+ // Give the other batches a chance to complete in error cases.
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+ LOG.warn("Taking over 5 minutes to flush gcs op batches after
error");
+ executor.shutdownNow();
+ if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+ LOG.warn("Took over 10 minutes to flush gcs op batches after error
and interruption.");
+ }
+ }
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while executing batch GCS request",
e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof FileNotFoundException) {
- throw (FileNotFoundException) e.getCause();
- }
- throw new IOException("Error executing batch GCS request", e);
- } finally {
- executor.shutdown();
}
}
@@ -865,14 +885,14 @@ public class GcsUtil {
private final boolean ignoreMissingSource;
private boolean readyToEnqueue;
private boolean performDelete;
- private GoogleJsonError lastError;
+ private @Nullable GoogleJsonError lastError;
@VisibleForTesting Storage.Objects.Rewrite rewriteRequest;
public boolean getReadyToEnqueue() {
return readyToEnqueue;
}
- public GoogleJsonError getLastError() {
+ public @Nullable GoogleJsonError getLastError() {
return lastError;
}
@@ -884,6 +904,10 @@ public class GcsUtil {
return to;
}
+ public boolean isMetadataOperation() {
+ return performDelete || from.getBucket().equals(to.getBucket());
+ }
+
public void enqueue(BatchInterface batch) throws IOException {
if (!readyToEnqueue) {
throw new IOException(
@@ -891,38 +915,38 @@ public class GcsUtil {
"Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",
from, to, readyToEnqueue));
}
- if (performDelete) {
- Storage.Objects.Delete deleteRequest =
- storageClient.objects().delete(from.getBucket(), from.getObject());
- batch.queue(
- deleteRequest,
- new JsonBatchCallback<Void>() {
- @Override
- public void onSuccess(Void obj, HttpHeaders responseHeaders) {
- LOG.debug("Successfully deleted {} after moving to {}", from,
to);
+ if (!performDelete) {
+ batch.queue(rewriteRequest, this);
+ return;
+ }
+ Storage.Objects.Delete deleteRequest =
+ storageClient.objects().delete(from.getBucket(), from.getObject());
+ batch.queue(
+ deleteRequest,
+ new JsonBatchCallback<Void>() {
+ @Override
+ public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+ LOG.debug("Successfully deleted {} after moving to {}", from,
to);
+ readyToEnqueue = false;
+ lastError = null;
+ }
+
+ @Override
+ public void onFailure(GoogleJsonError e, HttpHeaders
responseHeaders)
+ throws IOException {
+ if (e.getCode() == 404) {
+ LOG.info(
+ "Ignoring failed deletion of moved file {} which already
does not exist: {}",
+ from,
+ e);
readyToEnqueue = false;
lastError = null;
+ } else {
+ readyToEnqueue = true;
+ lastError = e;
}
-
- @Override
- public void onFailure(GoogleJsonError e, HttpHeaders
responseHeaders)
- throws IOException {
- if (e.getCode() == 404) {
- LOG.info(
- "Ignoring failed deletion of moved file {} which already
does not exist: {}",
- from,
- e);
- readyToEnqueue = false;
- lastError = null;
- } else {
- readyToEnqueue = true;
- lastError = e;
- }
- }
- });
- } else {
- batch.queue(rewriteRequest, this);
- }
+ }
+ });
}
public RewriteOp(GcsPath from, GcsPath to, boolean deleteSource, boolean
ignoreMissingSource)
@@ -1055,6 +1079,7 @@ public class GcsUtil {
if (batches.isEmpty()) {
break;
}
+ Preconditions.checkState(!rewrites.isEmpty());
RewriteOp sampleErrorOp =
rewrites.stream().filter(op -> op.getLastError() !=
null).findFirst().orElse(null);
if (sampleErrorOp != null) {
@@ -1117,23 +1142,38 @@ public class GcsUtil {
List<BatchInterface> makeRewriteBatches(LinkedList<RewriteOp> rewrites)
throws IOException {
List<BatchInterface> batches = new ArrayList<>();
- BatchInterface batch = batchRequestSupplier.get();
+ @Nullable BatchInterface opBatch = null;
+ boolean useSeparateRewriteDataBatch = this.rewriteDataOpBatchLimit !=
MAX_REQUESTS_PER_BATCH;
Iterator<RewriteOp> it = rewrites.iterator();
+ List<RewriteOp> deferredRewriteDataOps = new ArrayList<>();
while (it.hasNext()) {
RewriteOp rewrite = it.next();
if (!rewrite.getReadyToEnqueue()) {
it.remove();
continue;
}
- rewrite.enqueue(batch);
-
- if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
- batches.add(batch);
- batch = batchRequestSupplier.get();
+ if (useSeparateRewriteDataBatch && !rewrite.isMetadataOperation()) {
+ deferredRewriteDataOps.add(rewrite);
+ } else {
+ if (opBatch != null && opBatch.size() >= MAX_REQUESTS_PER_BATCH) {
+ opBatch = null;
+ }
+ if (opBatch == null) {
+ opBatch = batchRequestSupplier.get();
+ batches.add(opBatch);
+ }
+ rewrite.enqueue(opBatch);
}
}
- if (batch.size() > 0) {
- batches.add(batch);
+ for (RewriteOp rewrite : deferredRewriteDataOps) {
+ if (opBatch != null && opBatch.size() >= this.rewriteDataOpBatchLimit) {
+ opBatch = null;
+ }
+ if (opBatch == null) {
+ opBatch = batchRequestSupplier.get();
+ batches.add(opBatch);
+ }
+ rewrite.enqueue(opBatch);
}
return batches;
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
index cfc925a515e..d053a5f4bf8 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
@@ -59,10 +59,11 @@ public class RetryHttpRequestInitializer implements
HttpRequestInitializer {
307 /* Redirect, handled by the client library */,
308 /* Resume Incomplete, handled by the client library */));
- /** Http response timeout to use for hanging gets. */
+ /** Default http response timeout to use for hanging gets. */
private static final int HANGING_GET_TIMEOUT_SEC = 80;
private int writeTimeout;
+ private int readTimeout;
/** Handlers used to provide additional logging information on unsuccessful
HTTP requests. */
private static class LoggingHttpBackOffHandler
@@ -249,13 +250,14 @@ public class RetryHttpRequestInitializer implements
HttpRequestInitializer {
this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
this.responseInterceptor = responseInterceptor;
this.writeTimeout = 0;
+ // Set a timeout for hanging-gets.
+ // TODO: Do this exclusively for work requests.
+ this.readTimeout = HANGING_GET_TIMEOUT_SEC * 1000;
}
@Override
public void initialize(HttpRequest request) throws IOException {
- // Set a timeout for hanging-gets.
- // TODO: Do this exclusively for work requests.
- request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
+ request.setReadTimeout(this.readTimeout);
request.setWriteTimeout(this.writeTimeout);
LoggingHttpBackOffHandler loggingHttpBackOffHandler =
@@ -295,4 +297,9 @@ public class RetryHttpRequestInitializer implements
HttpRequestInitializer {
public void setHttpHeaders(Map<String, String> httpHeaders) {
this.httpHeaders = httpHeaders;
}
+
+ /** @param readTimeout in milliseconds. */
+ public void setReadTimeout(int readTimeout) {
+ this.readTimeout = readTimeout;
+ }
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
index f7ecbfeda77..ac6d825e125 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
@@ -34,6 +34,7 @@ import java.net.URL;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.Optional;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.util.ReleaseInfo;
@@ -94,8 +95,6 @@ public class Transport {
/** Returns a Cloud Storage client builder using the specified {@link
GcsOptions}. */
public static Storage.Builder newStorageClient(GcsOptions options) {
- String jobName =
Optional.ofNullable(options.getJobName()).orElse("UNKNOWN");
-
String applicationName =
String.format(
"%sapache-beam/%s (GPN:Beam)",
@@ -104,20 +103,9 @@ public class Transport {
String servicePath = options.getGcsEndpoint();
- // Do not log the code 404. Code up the stack will deal with 404's if
needed,
- // and logging it by default clutters the output during file staging.
- RetryHttpRequestInitializer retryHttpRequestInitializer =
- new RetryHttpRequestInitializer(ImmutableList.of(404), new
UploadIdResponseInterceptor());
-
- // Set custom audit info in request headers
-
retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job",
jobName));
-
Storage.Builder storageBuilder =
new Storage.Builder(
- getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(), retryHttpRequestInitializer))
+ getTransport(), getJsonFactory(),
httpRequestInitializerFromOptions(options))
.setApplicationName(applicationName)
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
if (servicePath != null) {
@@ -129,14 +117,31 @@ public class Transport {
return storageBuilder;
}
- private static HttpRequestInitializer chainHttpRequestInitializer(
- Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ private static HttpRequestInitializer
httpRequestInitializerFromOptions(GcsOptions options) {
+ // Do not log the code 404. Code up the stack will deal with 404's if
needed,
+ // and logging it by default clutters the output during file staging.
+ RetryHttpRequestInitializer retryHttpRequestInitializer =
+ new RetryHttpRequestInitializer(ImmutableList.of(404), new
UploadIdResponseInterceptor());
+
+ // Set custom audit info in request headers
+ String jobName =
Optional.ofNullable(options.getJobName()).orElse("UNKNOWN");
+
retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job",
jobName));
+
+ @Nullable Integer readTimeout = options.getGcsHttpRequestReadTimeout();
+ if (readTimeout != null) {
+ retryHttpRequestInitializer.setReadTimeout(readTimeout);
+ }
+ @Nullable Integer writeTimeout = options.getGcsHttpRequestWriteTimeout();
+ if (writeTimeout != null) {
+ retryHttpRequestInitializer.setWriteTimeout(writeTimeout);
+ }
+ Credentials credential = options.getGcpCredential();
if (credential == null) {
return new ChainingHttpRequestInitializer(
- new NullCredentialInitializer(), httpRequestInitializer);
+ new NullCredentialInitializer(), retryHttpRequestInitializer);
} else {
return new ChainingHttpRequestInitializer(
- new HttpCredentialsAdapter(credential), httpRequestInitializer);
+ new HttpCredentialsAdapter(credential), retryHttpRequestInitializer);
}
}
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 8b311abcb51..d22496fca58 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -70,6 +70,7 @@ import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InvalidObjectException;
import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.nio.channels.SeekableByteChannel;
@@ -1069,9 +1070,13 @@ public class GcsUtilTest {
}
private static List<String> makeStrings(String s, int n) {
+ return makeStrings("bucket", s, n);
+ }
+
+ private static List<String> makeStrings(String bucket, String s, int n) {
ImmutableList.Builder<String> ret = ImmutableList.builder();
for (int i = 0; i < n; ++i) {
- ret.add(String.format("gs://bucket/%s%d", s, i));
+ ret.add(String.format("gs://%s/%s%d", bucket, s, i));
}
return ret.build();
}
@@ -1156,6 +1161,48 @@ public class GcsUtilTest {
assertThat(sumBatchSizes(batches), equalTo(501));
}
+ @Test
+ public void testMakeRewriteBatchesWithLowerDataOpLimit() throws IOException {
+ GcsOptions options = gcsOptionsWithTestCredential();
+ options.setGcsRewriteDataOpBatchLimit(2);
+ GcsUtil gcsUtil = options.getGcsUtil();
+
+ // Small number of files in same bucket fits in 1 batch
+ List<BatchInterface> batches =
+ gcsUtil.makeRewriteBatches(
+ gcsUtil.makeRewriteOps(makeStrings("s", 5), makeStrings("d", 5),
false, false, false));
+ assertThat(batches.size(), equalTo(1));
+ assertThat(sumBatchSizes(batches), equalTo(5));
+
+ // Files copying between buckets use smaller batch size
+ batches =
+ gcsUtil.makeRewriteBatches(
+ gcsUtil.makeRewriteOps(
+ makeStrings("bucket1", "s", 5),
+ makeStrings("bucket2", "d", 5),
+ false,
+ false,
+ false));
+ assertThat(batches.size(), equalTo(3));
+ assertThat(sumBatchSizes(batches), equalTo(5));
+
+ // A mix of same bucket and different buckets uses large batches when
possible.
+ List<String> fromFiles = new ArrayList<>(makeStrings("bucket1", "s", 3));
+ List<String> toFiles = new ArrayList<>(makeStrings("bucket2", "d", 3));
+ fromFiles.addAll(makeStrings("t", 90));
+ toFiles.addAll(makeStrings("e", 90));
+ fromFiles.addAll(makeStrings("bucket3", "u", 3));
+ toFiles.addAll(makeStrings("bucket4", "f", 3));
+ fromFiles.addAll(makeStrings("bucket5", "v", 1));
+ toFiles.addAll(makeStrings("bucket5", "g", 1));
+
+ batches =
+ gcsUtil.makeRewriteBatches(gcsUtil.makeRewriteOps(fromFiles, toFiles,
false, false, false));
+ assertThat(batches.size(), equalTo(4));
+ assertThat(batches.get(0).size(), equalTo(91));
+ assertThat(sumBatchSizes(batches), equalTo(97));
+ }
+
@Test
public void testMakeRewriteOpsInvalid() throws IOException {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
@@ -1184,6 +1231,9 @@ public class GcsUtilTest {
cb.onFailure(error, null);
} catch (GoogleJsonResponseException e) {
cb.onFailure(e.getDetails(), null);
+ } catch (SocketTimeoutException e) {
+ System.out.println("Propagating socket exception as batch
processing error");
+ throw e;
} catch (Exception e) {
System.out.println("Propagating exception as server error " +
e);
e.printStackTrace();
@@ -1226,7 +1276,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1237,13 +1287,13 @@ public class GcsUtilTest {
when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
.thenReturn(mockStorageRewrite);
when(mockStorageRewrite.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenThrow(new InvalidObjectException("Test exception"))
.thenReturn(new RewriteResponse().setDone(true));
when(mockStorageObjects.delete("bucket", "s0"))
.thenReturn(mockStorageDelete1)
.thenReturn(mockStorageDelete2);
- when(mockStorageDelete1.execute()).thenThrow(new
SocketTimeoutException("SocketException"));
+ when(mockStorageDelete1.execute()).thenThrow(new
InvalidObjectException("Test exception"));
gcsUtil.rename(makeStrings("s", 1), makeStrings("d", 1));
verify(mockStorageRewrite, times(2)).execute();
@@ -1258,7 +1308,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite1 =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1288,7 +1338,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1309,7 +1359,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1352,7 +1402,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest1 =
Mockito.mock(Storage.Objects.Get.class);
@@ -1384,7 +1434,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest =
Mockito.mock(Storage.Objects.Get.class);
@@ -1472,7 +1522,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest =
Mockito.mock(Storage.Objects.Get.class);
@@ -1492,7 +1542,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest =
Mockito.mock(Storage.Objects.Get.class);
@@ -1518,7 +1568,7 @@ public class GcsUtilTest {
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+ gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
when(mockStorage.objects()).thenReturn(mockStorageObjects);
@@ -1545,7 +1595,8 @@ public class GcsUtilTest {
gcsOptions.getExecutorService(),
hasExperiment(options, "use_grpc_for_gcs"),
gcsOptions.getGcpCredential(),
- gcsOptions.getGcsUploadBufferSizeBytes());
+ gcsOptions.getGcsUploadBufferSizeBytes(),
+ gcsOptions.getGcsRewriteDataOpBatchLimit());
}
private GcsUtilMock(
@@ -1554,14 +1605,16 @@ public class GcsUtilTest {
ExecutorService executorService,
Boolean shouldUseGrpc,
Credentials credentials,
- @Nullable Integer uploadBufferSizeBytes) {
+ @Nullable Integer uploadBufferSizeBytes,
+ @Nullable Integer rewriteDataOpBatchLimit) {
super(
storageClient,
httpRequestInitializer,
executorService,
shouldUseGrpc,
credentials,
- uploadBufferSizeBytes);
+ uploadBufferSizeBytes,
+ rewriteDataOpBatchLimit);
}
@Override