Repository: beam Updated Branches: refs/heads/master 6413299a2 -> 1c6e66741
Revert "This closes #1184" This reverts commit c525783704e0cc47845df8cdec1715e1f1c74008, reversing changes made to 979c9376f820577bad43c18cc1a7ee86fab9d942. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fee029f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fee029f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fee029f7 Branch: refs/heads/master Commit: fee029f7f9963c9de821ff5792d7f45fabe6cb5d Parents: 6413299 Author: Dan Halperin <[email protected]> Authored: Wed Jan 25 15:54:26 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Wed Jan 25 15:54:26 2017 -0800 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 5 - .../beam/runners/dataflow/util/GcsStager.java | 18 +- .../beam/runners/dataflow/util/PackageUtil.java | 349 +++++++------------ .../runners/dataflow/util/PackageUtilTest.java | 42 +-- .../org/apache/beam/sdk/options/GcsOptions.java | 4 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 12 - 6 files changed, 149 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 9858b3d..eea5502 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -203,11 +203,6 @@ </dependency> <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-storage</artifactId> - </dependency> - - <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-credentials</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index 53822e3..6ca4c3f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -17,19 +17,13 @@ */ package org.apache.beam.runners.dataflow.util; -import static com.google.common.base.MoreObjects.firstNonNull; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.api.services.storage.Storage; import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; -import org.apache.beam.sdk.util.Transport; /** * Utility class for staging files to GCS. @@ -41,7 +35,6 @@ public class GcsStager implements Stager { this.options = options; } - @SuppressWarnings("unused") // used via reflection public static GcsStager fromOptions(PipelineOptions options) { return new GcsStager(options.as(DataflowPipelineOptions.class)); } @@ -55,16 +48,7 @@ public class GcsStager implements Stager { if (windmillBinary != null) { filesToStage.add("windmill_main=" + windmillBinary); } - int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024); - checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0"); - uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024); - Storage.Builder storageBuilder = Transport.newStorageClient(options); - GcsUtil util = GcsUtilFactory.create( - storageBuilder.build(), - storageBuilder.getHttpRequestInitializer(), - options.getExecutorService(), - uploadSizeBytes); return PackageUtil.stageClasspathElements( - options.getFilesToStage(), options.getStagingLocation(), util); + options.getFilesToStage(), options.getStagingLocation()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index fa8c94d..6d910ba 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -17,62 +17,53 @@ */ package org.apache.beam.runners.dataflow.util; -import static com.google.common.base.Preconditions.checkArgument; - import com.fasterxml.jackson.core.Base64Variants; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.common.collect.Lists; import com.google.common.hash.Funnels; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.CountingOutputStream; import com.google.common.io.Files; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nullable; import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.GcsIOChannelFactory; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ZipFiles; -import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Helper routines for packages. */ -class PackageUtil { +public class PackageUtil { private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class); /** * A reasonable upper bound on the number of jars required to launch a Dataflow job. */ - private static final int SANE_CLASSPATH_SIZE = 1000; + public static final int SANE_CLASSPATH_SIZE = 1000; + /** + * The initial interval to use between package staging attempts. + */ + private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5); + /** + * The maximum number of retries when staging a file. + */ + private static final int MAX_RETRIES = 4; private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5)); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL); /** * Translates exceptions from API calls. @@ -80,18 +71,35 @@ class PackageUtil { private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor(); /** + * Creates a DataflowPackage containing information about how a classpath element should be + * staged, including the staging destination as well as its size and hash. + * + * @param classpathElement The local path for the classpath element. + * @param stagingPath The base location for staged classpath elements. + * @param overridePackageName If non-null, use the given value as the package name + * instead of generating one automatically. + * @return The package. + */ + @Deprecated + public static DataflowPackage createPackage(File classpathElement, + String stagingPath, String overridePackageName) { + return createPackageAttributes(classpathElement, stagingPath, overridePackageName) + .getDataflowPackage(); + } + + /** * Compute and cache the attributes of a classpath element that we will need to stage it. * - * @param source the file or directory to be staged. + * @param classpathElement the file or directory to be staged. * @param stagingPath The base location for staged classpath elements. * @param overridePackageName If non-null, use the given value as the package name * instead of generating one automatically. * @return a {@link PackageAttributes} that containing metadata about the object to be staged. */ - static PackageAttributes createPackageAttributes(File source, - String stagingPath, @Nullable String overridePackageName) { + static PackageAttributes createPackageAttributes(File classpathElement, + String stagingPath, String overridePackageName) { try { - boolean directory = source.isDirectory(); + boolean directory = classpathElement.isDirectory(); // Compute size and hash in one pass over file or directory. Hasher hasher = Hashing.md5().newHasher(); @@ -100,158 +108,25 @@ class PackageUtil { if (!directory) { // Files are staged as-is. - Files.asByteSource(source).copyTo(countingOutputStream); + Files.asByteSource(classpathElement).copyTo(countingOutputStream); } else { // Directories are recursively zipped. - ZipFiles.zipDirectory(source, countingOutputStream); + ZipFiles.zipDirectory(classpathElement, countingOutputStream); } long size = countingOutputStream.getCount(); String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); // Create the DataflowPackage with staging name and location. - String uniqueName = getUniqueContentName(source, hash); + String uniqueName = getUniqueContentName(classpathElement, hash); String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName); DataflowPackage target = new DataflowPackage(); target.setName(overridePackageName != null ? overridePackageName : uniqueName); target.setLocation(resourcePath); - return new PackageAttributes(size, hash, directory, target, source.getPath()); + return new PackageAttributes(size, hash, directory, target); } catch (IOException e) { - throw new RuntimeException("Package setup failure for " + source, e); - } - } - - /** Utility comparator used in uploading packages efficiently. */ - private static class PackageUploadOrder implements Comparator<PackageAttributes> { - @Override - public int compare(PackageAttributes o1, PackageAttributes o2) { - // Smaller size compares high so that bigger packages are uploaded first. - long sizeDiff = o2.getSize() - o1.getSize(); - if (sizeDiff != 0) { - // returns sign of long - return Long.signum(sizeDiff); - } - - // Otherwise, choose arbitrarily based on hash. - return o1.getHash().compareTo(o2.getHash()); - } - } - - /** - * Utility function that computes sizes and hashes of packages so that we can validate whether - * they have already been correctly staged. - */ - private static List<PackageAttributes> computePackageAttributes( - Collection<String> classpathElements, final String stagingPath, - ListeningExecutorService executorService) { - List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>(); - for (String classpathElement : classpathElements) { - @Nullable String userPackageName = null; - if (classpathElement.contains("=")) { - String[] components = classpathElement.split("=", 2); - userPackageName = components[0]; - classpathElement = components[1]; - } - @Nullable final String packageName = userPackageName; - - final File file = new File(classpathElement); - if (!file.exists()) { - LOG.warn("Skipping non-existent classpath element {} that was specified.", - classpathElement); - continue; - } - - ListenableFuture<PackageAttributes> future = - executorService.submit(new Callable<PackageAttributes>() { - @Override - public PackageAttributes call() throws Exception { - return createPackageAttributes(file, stagingPath, packageName); - } - }); - futures.add(future); - } - - try { - return Futures.allAsList(futures).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while staging packages", e); - } catch (ExecutionException e) { - throw new RuntimeException("Error while staging packages", e.getCause()); - } - } - - private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil) - throws IOException { - IOChannelFactory factory = IOChannelUtils.getFactory(target); - if (factory instanceof GcsIOChannelFactory) { - return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY); - } else { - return factory.create(target, MimeTypes.BINARY); - } - } - - /** - * Utility to verify whether a package has already been staged and, if not, copy it to the - * staging location. - */ - private static void stageOnePackage( - PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, - Sleeper retrySleeper, GcsUtil gcsUtil) { - String source = attributes.getSourcePath(); - String target = attributes.getDataflowPackage().getLocation(); - - // TODO: Should we attempt to detect the Mime type rather than - // always using MimeTypes.BINARY? - try { - try { - long remoteLength = IOChannelUtils.getSizeBytes(target); - if (remoteLength == attributes.getSize()) { - LOG.debug("Skipping classpath element already staged: {} at {}", - attributes.getSourcePath(), target); - numCached.incrementAndGet(); - return; - } - } catch (FileNotFoundException expected) { - // If the file doesn't exist, it means we need to upload it. - } - - // Upload file, retrying on failure. - BackOff backoff = BACKOFF_FACTORY.backoff(); - while (true) { - try { - LOG.debug("Uploading classpath element {} to {}", source, target); - try (WritableByteChannel writer = makeWriter(target, gcsUtil)) { - copyContent(source, writer); - } - numUploaded.incrementAndGet(); - break; - } catch (IOException e) { - if (ERROR_EXTRACTOR.accessDenied(e)) { - String errorMessage = String.format( - "Uploaded failed due to permissions error, will NOT retry staging " - + "of classpath %s. Please verify credentials are valid and that you have " - + "write access to %s. Stale credentials can be resolved by executing " - + "'gcloud auth application-default login'.", source, target); - LOG.error(errorMessage); - throw new IOException(errorMessage, e); - } - long sleep = backoff.nextBackOffMillis(); - if (sleep == BackOff.STOP) { - // Rethrow last error, to be included as a cause in the catch below. - LOG.error("Upload failed, will NOT retry staging of classpath: {}", - source, e); - throw e; - } else { - LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", - source, e); - retrySleeper.sleep(sleep); - } - } - } - } catch (Exception e) { - throw new RuntimeException("Could not stage classpath element: " + source, e); + throw new RuntimeException("Package setup failure for " + classpathElement, e); } } @@ -262,70 +137,113 @@ class PackageUtil { * @param stagingPath The base location to stage the elements to. * @return A list of cloud workflow packages, each representing a classpath element. */ - static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) { - ListeningExecutorService executorService = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32)); - try { - return stageClasspathElements( - classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil); - } finally { - executorService.shutdown(); - } + public static List<DataflowPackage> stageClasspathElements( + Collection<String> classpathElements, String stagingPath) { + return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT); } // Visible for testing. static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, final String stagingPath, - final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) { + Collection<String> classpathElements, String stagingPath, + Sleeper retrySleeper) { LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " + "prepare for execution.", classpathElements.size()); if (classpathElements.size() > SANE_CLASSPATH_SIZE) { LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically " - + "copies to all workers. Having this many entries on your classpath may be indicative " - + "of an issue in your pipeline. You may want to consider trimming the classpath to " - + "necessary dependencies only, using --filesToStage pipeline option to override " - + "what files are being staged, or bundling several dependencies into one.", + + "copies to all workers. Having this many entries on your classpath may be indicative " + + "of an issue in your pipeline. You may want to consider trimming the classpath to " + + "necessary dependencies only, using --filesToStage pipeline option to override " + + "what files are being staged, or bundling several dependencies into one.", classpathElements.size()); } - checkArgument( - stagingPath != null, - "Can't stage classpath elements because no staging location has been provided"); + ArrayList<DataflowPackage> packages = new ArrayList<>(); - // Inline a copy here because the inner code returns an immutable list and we want to mutate it. - List<PackageAttributes> packageAttributes = - new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService)); - // Order package attributes in descending size order so that we upload the largest files first. - Collections.sort(packageAttributes, new PackageUploadOrder()); + if (stagingPath == null) { + throw new IllegalArgumentException( + "Can't stage classpath elements on because no staging location has been provided"); + } - List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); - final AtomicInteger numUploaded = new AtomicInteger(0); - final AtomicInteger numCached = new AtomicInteger(0); + int numUploaded = 0; + int numCached = 0; + for (String classpathElement : classpathElements) { + String packageName = null; + if (classpathElement.contains("=")) { + String[] components = classpathElement.split("=", 2); + packageName = components[0]; + classpathElement = components[1]; + } - List<ListenableFuture<?>> futures = new LinkedList<>(); - for (final PackageAttributes attributes : packageAttributes) { - packages.add(attributes.getDataflowPackage()); - futures.add(executorService.submit(new Runnable() { - @Override - public void run() { - stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil); + File file = new File(classpathElement); + if (!file.exists()) { + LOG.warn("Skipping non-existent classpath element {} that was specified.", + classpathElement); + continue; + } + + PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName); + + DataflowPackage workflowPackage = attributes.getDataflowPackage(); + packages.add(workflowPackage); + String target = workflowPackage.getLocation(); + + // TODO: Should we attempt to detect the Mime type rather than + // always using MimeTypes.BINARY? + try { + try { + long remoteLength = IOChannelUtils.getSizeBytes(target); + if (remoteLength == attributes.getSize()) { + LOG.debug("Skipping classpath element already staged: {} at {}", + classpathElement, target); + numCached++; + continue; + } + } catch (FileNotFoundException expected) { + // If the file doesn't exist, it means we need to upload it. } - })); - } - try { - Futures.allAsList(futures).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while staging packages", e); - } catch (ExecutionException e) { - throw new RuntimeException("Error while staging packages", e.getCause()); + + // Upload file, retrying on failure. + BackOff backoff = BACKOFF_FACTORY.backoff(); + while (true) { + try { + LOG.debug("Uploading classpath element {} to {}", classpathElement, target); + try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) { + copyContent(classpathElement, writer); + } + numUploaded++; + break; + } catch (IOException e) { + if (ERROR_EXTRACTOR.accessDenied(e)) { + String errorMessage = String.format( + "Uploaded failed due to permissions error, will NOT retry staging " + + "of classpath %s. Please verify credentials are valid and that you have " + + "write access to %s. Stale credentials can be resolved by executing " + + "'gcloud auth login'.", classpathElement, target); + LOG.error(errorMessage); + throw new IOException(errorMessage, e); + } + long sleep = backoff.nextBackOffMillis(); + if (sleep == BackOff.STOP) { + // Rethrow last error, to be included as a cause in the catch below. + LOG.error("Upload failed, will NOT retry staging of classpath: {}", + classpathElement, e); + throw e; + } else { + LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", + classpathElement, e); + retrySleeper.sleep(sleep); + } + } + } + } catch (Exception e) { + throw new RuntimeException("Could not stage classpath element: " + classpathElement, e); + } } - LOG.info( - "Staging files complete: {} files cached, {} files newly uploaded", - numUploaded.get(), numCached.get()); + LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, " + + "{} files cached", + numUploaded, numCached); return packages; } @@ -375,15 +293,13 @@ class PackageUtil { private final boolean directory; private final long size; private final String hash; - private final String sourcePath; private DataflowPackage dataflowPackage; public PackageAttributes(long size, String hash, boolean directory, - DataflowPackage dataflowPackage, String sourcePath) { + DataflowPackage dataflowPackage) { this.size = size; this.hash = Objects.requireNonNull(hash, "hash"); this.directory = directory; - this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath"); this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage"); } @@ -414,12 +330,5 @@ class PackageUtil { public String getHash() { return hash; } - - /** - * @return the file to be uploaded - */ - public String getSourcePath() { - return sourcePath; - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 3828415..05a87dd 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.dataflow.util; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -53,7 +53,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.io.LineReader; -import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -236,7 +235,7 @@ public class PackageUtilTest { classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath()); } - PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil); + PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH); logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow"); } @@ -251,7 +250,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -278,7 +277,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -305,7 +304,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -328,8 +327,7 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), - mockGcsUtil); + STAGING_PATH, fastNanoClockAndSleeper); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString()); @@ -350,20 +348,16 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), - mockGcsUtil); + STAGING_PATH, fastNanoClockAndSleeper); fail("Expected RuntimeException"); } catch (RuntimeException e) { - assertThat("Expected RuntimeException wrapping IOException.", - e.getCause(), instanceOf(RuntimeException.class)); - assertThat("Expected IOException containing detailed message.", - e.getCause().getCause(), instanceOf(IOException.class)); - assertThat(e.getCause().getCause().getMessage(), + assertTrue("Expected IOException containing detailed message.", + e.getCause() instanceof IOException); + assertThat(e.getCause().getMessage(), Matchers.allOf( Matchers.containsString("Uploaded failed due to permissions error"), Matchers.containsString( - "Stale credentials can be resolved by executing 'gcloud auth application-default " - + "login'"))); + "Stale credentials can be resolved by executing 'gcloud auth login'"))); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -383,8 +377,9 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, - MoreExecutors.newDirectExecutorService(), mockGcsUtil); + ImmutableList.of(tmpFile.getAbsolutePath()), + STAGING_PATH, + fastNanoClockAndSleeper); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString()); @@ -398,7 +393,7 @@ public class PackageUtilTest { when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -416,7 +411,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -434,8 +429,7 @@ public class PackageUtilTest { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH, - mockGcsUtil); + ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -452,7 +446,7 @@ public class PackageUtilTest { String nonExistentFile = IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file"); assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements( - ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil)); + ImmutableList.of(nonExistentFile), STAGING_PATH)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java index 72e106d..0553efc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import org.apache.beam.sdk.util.AppEngineEnvironment; import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; @@ -82,9 +81,8 @@ public interface GcsOptions extends + "information on the restrictions and performance implications of this value.\n\n" + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") - @Nullable Integer getGcsUploadBufferSizeBytes(); - void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); + void setGcsUploadBufferSizeBytes(Integer bytes); /** * The class of the validator that should be created and used to validate paths. http://git-wip-us.apache.org/repos/asf/beam/blob/fee029f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 5e83584..a10ea28 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -101,18 +101,6 @@ public class GcsUtil { gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes()); } - - /** - * Returns an instance of {@link GcsUtil} based on the given parameters. - */ - public static GcsUtil create( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - @Nullable Integer uploadBufferSizeBytes) { - return new GcsUtil( - storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); - } } private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
