Repository: beam Updated Branches: refs/heads/master 979c9376f -> c52578370
PackageUtil: parallelize staging of files Proceeds in stages: 1. In parallel, hash and size all files. 2. Sort files by descending size. 3. In parallel, upload files. Also a little cleanup for Dataflow 2.0: * proper visibility * removing some deprecated code * refactoring into smaller methods. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ecf7e70 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ecf7e70 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ecf7e70 Branch: refs/heads/master Commit: 3ecf7e70bcc4775d804f096de647d13c407a8d52 Parents: 979c937 Author: Dan Halperin <[email protected]> Authored: Mon Oct 24 17:27:23 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Jan 25 11:03:03 2017 -0800 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 5 + .../beam/runners/dataflow/util/GcsStager.java | 18 +- .../beam/runners/dataflow/util/PackageUtil.java | 349 ++++++++++++------- .../runners/dataflow/util/PackageUtilTest.java | 42 ++- .../org/apache/beam/sdk/options/GcsOptions.java | 4 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 12 + 6 files changed, 281 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index eea5502..9858b3d 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -203,6 +203,11 @@ </dependency> <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-storage</artifactId> + </dependency> + + <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-credentials</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index 6ca4c3f..53822e3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -17,13 +17,19 @@ */ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.storage.Storage; import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; +import org.apache.beam.sdk.util.Transport; /** * Utility class for staging files to GCS. @@ -35,6 +41,7 @@ public class GcsStager implements Stager { this.options = options; } + @SuppressWarnings("unused") // used via reflection public static GcsStager fromOptions(PipelineOptions options) { return new GcsStager(options.as(DataflowPipelineOptions.class)); } @@ -48,7 +55,16 @@ public class GcsStager implements Stager { if (windmillBinary != null) { filesToStage.add("windmill_main=" + windmillBinary); } + int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024); + checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0"); + uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024); + Storage.Builder storageBuilder = Transport.newStorageClient(options); + GcsUtil util = GcsUtilFactory.create( + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), + options.getExecutorService(), + uploadSizeBytes); return PackageUtil.stageClasspathElements( - options.getFilesToStage(), options.getStagingLocation()); + options.getFilesToStage(), options.getStagingLocation(), util); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 6d910ba..fa8c94d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -17,53 +17,62 @@ */ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.Preconditions.checkArgument; + import com.fasterxml.jackson.core.Base64Variants; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.common.collect.Lists; import com.google.common.hash.Funnels; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.CountingOutputStream; import com.google.common.io.Files; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ZipFiles; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Helper routines for packages. */ -public class PackageUtil { +class PackageUtil { private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class); /** * A reasonable upper bound on the number of jars required to launch a Dataflow job. */ - public static final int SANE_CLASSPATH_SIZE = 1000; - /** - * The initial interval to use between package staging attempts. - */ - private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5); - /** - * The maximum number of retries when staging a file. - */ - private static final int MAX_RETRIES = 4; + private static final int SANE_CLASSPATH_SIZE = 1000; private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL); + FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5)); /** * Translates exceptions from API calls. @@ -71,35 +80,18 @@ public class PackageUtil { private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor(); /** - * Creates a DataflowPackage containing information about how a classpath element should be - * staged, including the staging destination as well as its size and hash. - * - * @param classpathElement The local path for the classpath element. - * @param stagingPath The base location for staged classpath elements. - * @param overridePackageName If non-null, use the given value as the package name - * instead of generating one automatically. - * @return The package. - */ - @Deprecated - public static DataflowPackage createPackage(File classpathElement, - String stagingPath, String overridePackageName) { - return createPackageAttributes(classpathElement, stagingPath, overridePackageName) - .getDataflowPackage(); - } - - /** * Compute and cache the attributes of a classpath element that we will need to stage it. * - * @param classpathElement the file or directory to be staged. + * @param source the file or directory to be staged. * @param stagingPath The base location for staged classpath elements. * @param overridePackageName If non-null, use the given value as the package name * instead of generating one automatically. * @return a {@link PackageAttributes} that containing metadata about the object to be staged. */ - static PackageAttributes createPackageAttributes(File classpathElement, - String stagingPath, String overridePackageName) { + static PackageAttributes createPackageAttributes(File source, + String stagingPath, @Nullable String overridePackageName) { try { - boolean directory = classpathElement.isDirectory(); + boolean directory = source.isDirectory(); // Compute size and hash in one pass over file or directory. Hasher hasher = Hashing.md5().newHasher(); @@ -108,142 +100,232 @@ public class PackageUtil { if (!directory) { // Files are staged as-is. - Files.asByteSource(classpathElement).copyTo(countingOutputStream); + Files.asByteSource(source).copyTo(countingOutputStream); } else { // Directories are recursively zipped. - ZipFiles.zipDirectory(classpathElement, countingOutputStream); + ZipFiles.zipDirectory(source, countingOutputStream); } long size = countingOutputStream.getCount(); String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); // Create the DataflowPackage with staging name and location. - String uniqueName = getUniqueContentName(classpathElement, hash); + String uniqueName = getUniqueContentName(source, hash); String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName); DataflowPackage target = new DataflowPackage(); target.setName(overridePackageName != null ? overridePackageName : uniqueName); target.setLocation(resourcePath); - return new PackageAttributes(size, hash, directory, target); + return new PackageAttributes(size, hash, directory, target, source.getPath()); } catch (IOException e) { - throw new RuntimeException("Package setup failure for " + classpathElement, e); + throw new RuntimeException("Package setup failure for " + source, e); } } - /** - * Transfers the classpath elements to the staging location. - * - * @param classpathElements The elements to stage. - * @param stagingPath The base location to stage the elements to. - * @return A list of cloud workflow packages, each representing a classpath element. - */ - public static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, String stagingPath) { - return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT); - } - - // Visible for testing. - static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, String stagingPath, - Sleeper retrySleeper) { - LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " - + "prepare for execution.", classpathElements.size()); - - if (classpathElements.size() > SANE_CLASSPATH_SIZE) { - LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically " - + "copies to all workers. Having this many entries on your classpath may be indicative " - + "of an issue in your pipeline. You may want to consider trimming the classpath to " - + "necessary dependencies only, using --filesToStage pipeline option to override " - + "what files are being staged, or bundling several dependencies into one.", - classpathElements.size()); - } - - ArrayList<DataflowPackage> packages = new ArrayList<>(); + /** Utility comparator used in uploading packages efficiently. */ + private static class PackageUploadOrder implements Comparator<PackageAttributes> { + @Override + public int compare(PackageAttributes o1, PackageAttributes o2) { + // Smaller size compares high so that bigger packages are uploaded first. + long sizeDiff = o2.getSize() - o1.getSize(); + if (sizeDiff != 0) { + // returns sign of long + return Long.signum(sizeDiff); + } - if (stagingPath == null) { - throw new IllegalArgumentException( - "Can't stage classpath elements on because no staging location has been provided"); + // Otherwise, choose arbitrarily based on hash. + return o1.getHash().compareTo(o2.getHash()); } + } - int numUploaded = 0; - int numCached = 0; + /** + * Utility function that computes sizes and hashes of packages so that we can validate whether + * they have already been correctly staged. + */ + private static List<PackageAttributes> computePackageAttributes( + Collection<String> classpathElements, final String stagingPath, + ListeningExecutorService executorService) { + List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>(); for (String classpathElement : classpathElements) { - String packageName = null; + @Nullable String userPackageName = null; if (classpathElement.contains("=")) { String[] components = classpathElement.split("=", 2); - packageName = components[0]; + userPackageName = components[0]; classpathElement = components[1]; } + @Nullable final String packageName = userPackageName; - File file = new File(classpathElement); + final File file = new File(classpathElement); if (!file.exists()) { LOG.warn("Skipping non-existent classpath element {} that was specified.", classpathElement); continue; } - PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName); + ListenableFuture<PackageAttributes> future = + executorService.submit(new Callable<PackageAttributes>() { + @Override + public PackageAttributes call() throws Exception { + return createPackageAttributes(file, stagingPath, packageName); + } + }); + futures.add(future); + } + + try { + return Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while staging packages", e); + } catch (ExecutionException e) { + throw new RuntimeException("Error while staging packages", e.getCause()); + } + } + + private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil) + throws IOException { + IOChannelFactory factory = IOChannelUtils.getFactory(target); + if (factory instanceof GcsIOChannelFactory) { + return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY); + } else { + return factory.create(target, MimeTypes.BINARY); + } + } - DataflowPackage workflowPackage = attributes.getDataflowPackage(); - packages.add(workflowPackage); - String target = workflowPackage.getLocation(); + /** + * Utility to verify whether a package has already been staged and, if not, copy it to the + * staging location. + */ + private static void stageOnePackage( + PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, + Sleeper retrySleeper, GcsUtil gcsUtil) { + String source = attributes.getSourcePath(); + String target = attributes.getDataflowPackage().getLocation(); - // TODO: Should we attempt to detect the Mime type rather than - // always using MimeTypes.BINARY? + // TODO: Should we attempt to detect the Mime type rather than + // always using MimeTypes.BINARY? + try { try { - try { - long remoteLength = IOChannelUtils.getSizeBytes(target); - if (remoteLength == attributes.getSize()) { - LOG.debug("Skipping classpath element already staged: {} at {}", - classpathElement, target); - numCached++; - continue; - } - } catch (FileNotFoundException expected) { - // If the file doesn't exist, it means we need to upload it. + long remoteLength = IOChannelUtils.getSizeBytes(target); + if (remoteLength == attributes.getSize()) { + LOG.debug("Skipping classpath element already staged: {} at {}", + attributes.getSourcePath(), target); + numCached.incrementAndGet(); + return; } + } catch (FileNotFoundException expected) { + // If the file doesn't exist, it means we need to upload it. + } - // Upload file, retrying on failure. - BackOff backoff = BACKOFF_FACTORY.backoff(); - while (true) { - try { - LOG.debug("Uploading classpath element {} to {}", classpathElement, target); - try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) { - copyContent(classpathElement, writer); - } - numUploaded++; - break; - } catch (IOException e) { - if (ERROR_EXTRACTOR.accessDenied(e)) { - String errorMessage = String.format( - "Uploaded failed due to permissions error, will NOT retry staging " - + "of classpath %s. Please verify credentials are valid and that you have " - + "write access to %s. Stale credentials can be resolved by executing " - + "'gcloud auth login'.", classpathElement, target); - LOG.error(errorMessage); - throw new IOException(errorMessage, e); - } - long sleep = backoff.nextBackOffMillis(); - if (sleep == BackOff.STOP) { - // Rethrow last error, to be included as a cause in the catch below. - LOG.error("Upload failed, will NOT retry staging of classpath: {}", - classpathElement, e); - throw e; - } else { - LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", - classpathElement, e); - retrySleeper.sleep(sleep); - } + // Upload file, retrying on failure. + BackOff backoff = BACKOFF_FACTORY.backoff(); + while (true) { + try { + LOG.debug("Uploading classpath element {} to {}", source, target); + try (WritableByteChannel writer = makeWriter(target, gcsUtil)) { + copyContent(source, writer); + } + numUploaded.incrementAndGet(); + break; + } catch (IOException e) { + if (ERROR_EXTRACTOR.accessDenied(e)) { + String errorMessage = String.format( + "Uploaded failed due to permissions error, will NOT retry staging " + + "of classpath %s. Please verify credentials are valid and that you have " + + "write access to %s. Stale credentials can be resolved by executing " + + "'gcloud auth application-default login'.", source, target); + LOG.error(errorMessage); + throw new IOException(errorMessage, e); + } + long sleep = backoff.nextBackOffMillis(); + if (sleep == BackOff.STOP) { + // Rethrow last error, to be included as a cause in the catch below. + LOG.error("Upload failed, will NOT retry staging of classpath: {}", + source, e); + throw e; + } else { + LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", + source, e); + retrySleeper.sleep(sleep); } } - } catch (Exception e) { - throw new RuntimeException("Could not stage classpath element: " + classpathElement, e); } + } catch (Exception e) { + throw new RuntimeException("Could not stage classpath element: " + source, e); } + } - LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, " - + "{} files cached", - numUploaded, numCached); + /** + * Transfers the classpath elements to the staging location. + * + * @param classpathElements The elements to stage. + * @param stagingPath The base location to stage the elements to. + * @return A list of cloud workflow packages, each representing a classpath element. + */ + static List<DataflowPackage> stageClasspathElements( + Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) { + ListeningExecutorService executorService = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32)); + try { + return stageClasspathElements( + classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil); + } finally { + executorService.shutdown(); + } + } + + // Visible for testing. + static List<DataflowPackage> stageClasspathElements( + Collection<String> classpathElements, final String stagingPath, + final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) { + LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " + + "prepare for execution.", classpathElements.size()); + + if (classpathElements.size() > SANE_CLASSPATH_SIZE) { + LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically " + + "copies to all workers. Having this many entries on your classpath may be indicative " + + "of an issue in your pipeline. You may want to consider trimming the classpath to " + + "necessary dependencies only, using --filesToStage pipeline option to override " + + "what files are being staged, or bundling several dependencies into one.", + classpathElements.size()); + } + + checkArgument( + stagingPath != null, + "Can't stage classpath elements because no staging location has been provided"); + + // Inline a copy here because the inner code returns an immutable list and we want to mutate it. + List<PackageAttributes> packageAttributes = + new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService)); + // Order package attributes in descending size order so that we upload the largest files first. + Collections.sort(packageAttributes, new PackageUploadOrder()); + + List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); + final AtomicInteger numUploaded = new AtomicInteger(0); + final AtomicInteger numCached = new AtomicInteger(0); + + List<ListenableFuture<?>> futures = new LinkedList<>(); + for (final PackageAttributes attributes : packageAttributes) { + packages.add(attributes.getDataflowPackage()); + futures.add(executorService.submit(new Runnable() { + @Override + public void run() { + stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil); + } + })); + } + try { + Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while staging packages", e); + } catch (ExecutionException e) { + throw new RuntimeException("Error while staging packages", e.getCause()); + } + + LOG.info( + "Staging files complete: {} files cached, {} files newly uploaded", + numUploaded.get(), numCached.get()); return packages; } @@ -293,13 +375,15 @@ public class PackageUtil { private final boolean directory; private final long size; private final String hash; + private final String sourcePath; private DataflowPackage dataflowPackage; public PackageAttributes(long size, String hash, boolean directory, - DataflowPackage dataflowPackage) { + DataflowPackage dataflowPackage, String sourcePath) { this.size = size; this.hash = Objects.requireNonNull(hash, "hash"); this.directory = directory; + this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath"); this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage"); } @@ -330,5 +414,12 @@ public class PackageUtil { public String getHash() { return hash; } + + /** + * @return the file to be uploaded + */ + public String getSourcePath() { + return sourcePath; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 05a87dd..3828415 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.dataflow.util; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -53,6 +53,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.io.LineReader; +import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -235,7 +236,7 @@ public class PackageUtilTest { classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath()); } - PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH); + PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil); logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow"); } @@ -250,7 +251,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -277,7 +278,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -304,7 +305,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -327,7 +328,8 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper); + STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), + mockGcsUtil); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString()); @@ -348,16 +350,20 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper); + STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), + mockGcsUtil); fail("Expected RuntimeException"); } catch (RuntimeException e) { - assertTrue("Expected IOException containing detailed message.", - e.getCause() instanceof IOException); - assertThat(e.getCause().getMessage(), + assertThat("Expected RuntimeException wrapping IOException.", + e.getCause(), instanceOf(RuntimeException.class)); + assertThat("Expected IOException containing detailed message.", + e.getCause().getCause(), instanceOf(IOException.class)); + assertThat(e.getCause().getCause().getMessage(), Matchers.allOf( Matchers.containsString("Uploaded failed due to permissions error"), Matchers.containsString( - "Stale credentials can be resolved by executing 'gcloud auth login'"))); + "Stale credentials can be resolved by executing 'gcloud auth application-default " + + "login'"))); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -377,9 +383,8 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, - fastNanoClockAndSleeper); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, + MoreExecutors.newDirectExecutorService(), mockGcsUtil); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString()); @@ -393,7 +398,7 @@ public class PackageUtilTest { when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -411,7 +416,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -429,7 +434,8 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH, + mockGcsUtil); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -446,7 +452,7 @@ public class PackageUtilTest { String nonExistentFile = IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file"); assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements( - ImmutableList.of(nonExistentFile), STAGING_PATH)); + ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java index 0553efc..72e106d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.beam.sdk.util.AppEngineEnvironment; import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; @@ -81,8 +82,9 @@ public interface GcsOptions extends + "information on the restrictions and performance implications of this value.\n\n" + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") + @Nullable Integer getGcsUploadBufferSizeBytes(); - void setGcsUploadBufferSizeBytes(Integer bytes); + void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); /** * The class of the validator that should be created and used to validate paths. http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index a10ea28..5e83584 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -101,6 +101,18 @@ public class GcsUtil { gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes()); } + + /** + * Returns an instance of {@link GcsUtil} based on the given parameters. + */ + public static GcsUtil create( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + @Nullable Integer uploadBufferSizeBytes) { + return new GcsUtil( + storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); + } } private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
