PackageUtil: preserve classpath ordering when uploading Also add a test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0b91c84 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0b91c84 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0b91c84 Branch: refs/heads/python-sdk Commit: b0b91c842e09aa7fdb5c1dc216574daa43b437ea Parents: 23e2b91 Author: Dan Halperin <[email protected]> Authored: Wed Jan 25 22:15:59 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Wed Jan 25 22:16:22 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/util/PackageUtil.java | 11 +++++--- .../runners/dataflow/util/PackageUtilTest.java | 27 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index fa8c94d..685d48c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -297,16 +297,21 @@ class PackageUtil { // Inline a copy here because the inner code returns an immutable list and we want to mutate it. List<PackageAttributes> packageAttributes = new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService)); - // Order package attributes in descending size order so that we upload the largest files first. - Collections.sort(packageAttributes, new PackageUploadOrder()); + // Compute the returned list of DataflowPackage objects here so that they are returned in the + // same order as on the classpath. List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); + for (final PackageAttributes attributes : packageAttributes) { + packages.add(attributes.getDataflowPackage()); + } + + // Order package attributes in descending size order so that we upload the largest files first. + Collections.sort(packageAttributes, new PackageUploadOrder()); final AtomicInteger numUploaded = new AtomicInteger(0); final AtomicInteger numCached = new AtomicInteger(0); List<ListenableFuture<?>> futures = new LinkedList<>(); for (final PackageAttributes attributes : packageAttributes) { - packages.add(attributes.getDataflowPackage()); futures.add(executorService.submit(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/beam/blob/b0b91c84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 3828415..800c5a9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.util; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -59,6 +60,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.Pipe; +import java.nio.channels.Pipe.SinkChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -86,6 +88,8 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** Tests for PackageUtil. */ @RunWith(JUnit4.class) @@ -265,6 +269,29 @@ public class PackageUtilTest { } @Test + public void testStagingPreservesClasspath() throws Exception { + File smallFile = makeFileWithContents("small.txt", "small"); + File largeFile = makeFileWithContents("large.txt", "large contents"); + when(mockGcsUtil.fileSize(any(GcsPath.class))) + .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) + .thenAnswer(new Answer<SinkChannel>() { + @Override + public SinkChannel answer(InvocationOnMock invocation) throws Throwable { + return Pipe.open().sink(); + } + }); + + List<DataflowPackage> targets = PackageUtil.stageClasspathElements( + ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()), + STAGING_PATH, mockGcsUtil); + // Verify that the packages are returned small, then large, matching input order even though + // the large file would be uploaded first. + assertThat(targets.get(0).getName(), startsWith("small")); + assertThat(targets.get(1).getName(), startsWith("large")); + } + + @Test public void testPackageUploadWithDirectorySucceeds() throws Exception { Pipe pipe = Pipe.open(); File tmpDirectory = tmpFolder.newFolder("folder");
