Repository: incubator-beam Updated Branches: refs/heads/BEAM-357_windows-build-fails [created] 460d21cb7
fixing build on windows Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41883300 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41883300 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41883300 Branch: refs/heads/BEAM-357_windows-build-fails Commit: 418833001fe6dd581f42f7fcc3c35ef36f292007 Parents: 0e4d0a9 Author: Romain manni-Bucau <[email protected]> Authored: Sun Jun 19 21:19:57 2016 +0200 Committer: Romain manni-Bucau <[email protected]> Committed: Sun Jun 19 21:19:57 2016 +0200 ---------------------------------------------------------------------- .../beam/runners/flink/WriteSinkITCase.java | 13 + .../beam/runners/spark/SimpleWordCountTest.java | 8 + .../beam/runners/spark/io/AvroPipelineTest.java | 7 + .../beam/runners/spark/io/NumShardsTest.java | 7 + .../io/hadoop/HadoopFileFormatPipelineTest.java | 7 + .../translation/TransformTranslatorTest.java | 7 + .../src/main/resources/beam/checkstyle.xml | 4 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 7 +- .../beam/sdk/testing/HadoopWorkarounds.java | 129 +++++++++ sdks/java/io/hdfs/pom.xml | 9 + .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 264 ++++++++++--------- sdks/java/maven-archetypes/starter/pom.xml | 3 + 12 files changed, 334 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index 36d3aef..1a56350 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.test.util.JavaProgramTestBase; import java.io.File; +import java.io.IOException; import java.io.PrintWriter; import java.net.URI; @@ -75,6 +76,18 @@ public class WriteSinkITCase extends JavaProgramTestBase { p.run(); } + + @Override + public void stopCluster() throws Exception { + try { + super.stopCluster(); + } catch (final IOException ioe) { + if (ioe.getMessage().startsWith("Unable to delete file")) { + // that's ok for the test itself, just the OS playing with us on cleanup phase + } + } + } + /** * Simple custom sink which writes to a file. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 2b4464d..4980995 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -40,11 +41,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -61,6 +64,11 @@ public class SimpleWordCountTest { private static final Set<String> EXPECTED_COUNT_SET = ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Test public void testInMem() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index f358878..f6d0d55 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.Lists; @@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -58,6 +60,11 @@ public class AvroPipelineTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Before public void setUp() throws IOException { inputFile = tmpDir.newFile("test.avro"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 23d4592..8a864c4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; @@ -38,6 +39,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Files; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -64,6 +66,11 @@ public class NumShardsTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Before public void setUp() throws IOException { outputDir = tmpDir.newFolder("out"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index eaa508c..767682e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -26,6 +26,7 @@ import org.apache.beam.runners.spark.coders.WritableCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -58,6 +60,11 @@ public class HadoopFileFormatPipelineTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + @Before public void setUp() throws IOException { inputFile = tmpDir.newFile("test.seq"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index b593316..fec0dc9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -28,10 +28,12 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.Charsets; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -51,6 +53,11 @@ import java.util.List; public class TransformTranslatorTest { @Rule public TemporaryFolder tmp = new TemporaryFolder(); + @BeforeClass + public static void initWin() throws IOException { + HadoopWorkarounds.winTests(); + } + /** * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml index 311f599..457675a 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml @@ -29,7 +29,9 @@ page at http://checkstyle.sourceforge.net/config.html --> <!-- Checks that there are no tab characters in the file. --> </module> - <module name="NewlineAtEndOfFile"/> + <module name="NewlineAtEndOfFile"> + <property name="lineSeparator" value="lf" /> + </module> <module name="RegexpSingleline"> <!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. --> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 521f54b..045d6ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -35,6 +35,7 @@ import com.google.common.collect.Ordering; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.channels.WritableByteChannel; @@ -645,7 +646,11 @@ public abstract class FileBasedSink<T> extends Sink<T> { private void copyOne(String source, String destination) throws IOException { try { // Copy the source file, replacing the existing destination. - Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING); + // Paths.get(x) will not work on win cause of the ":" after the drive letter + Files.copy( + new File(source).toPath(), + new File(destination).toPath(), + StandardCopyOption.REPLACE_EXISTING); } catch (NoSuchFileException e) { LOG.debug("{} does not exist.", source); // Suppress exception if file does not exist. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java new file mode 100644 index 0000000..ee2e135 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.commons.compress.utils.IOUtils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URL; + +/** + * A simple class ensure winutils.exe can be found in the JVM. + */ +public class HadoopWorkarounds { + /** + * In practise this method only needs to be called once by JVM + * since hadoop uses static variables to store it. + * + * Note: ensure invocation is done before hadoop reads it + * and ensure this folder survives tests + * (avoid temporary folder usage since tests can share it). + * + * @param hadoopHome where to fake hadoop home. + */ + public static void win(final File hadoopHome) { + // if (Shell.osType != Shell.OSType.OS_TYPE_WIN) { // don't do that to not load Shell yet + if (!System.getProperty("os.name", "").startsWith("Windows") + || System.getProperty("hadoop.home.dir") != null) { + return; + } + + // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051 + // so use this github repo temporarly then just use the main tar.gz + /* + String hadoopVersion = VersionInfo.getVersion(); + final URL url = new URL("https://archive.apache.org/dist/hadoop/common/ + hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz"); + final File hadoopTar = tmpFolder.newFile(); + try (final InputStream is = new GZIPInputStream(url.openStream()); + final OutputStream os = new FileOutputStream(hadoopTar)) { + System.out.println("Downloading Hadoop in " + hadoopTar + ", " + + "this can take a while, if you have it locally " + + "maybe set \"hadoop.home.dir\" system property"); + IOUtils.copyLarge(is, os, new byte[1024 * 1024]); + } + + final File hadoopHome = tmpFolder.newFolder(); + try (final ArchiveInputStream stream = new TarArchiveInputStream( + new FileInputStream(hadoopTar))) { + ArchiveEntry entry; + while ((entry = stream.getNextEntry()) != null) { + if (entry.isDirectory()) { + FileUtils.forceMkdir(new File(hadoopHome, entry.getName())); + continue; + } + final File out = new File(hadoopHome, entry.getName()); + FileUtils.forceMkdir(out.getParentFile()); + try (final OutputStream os = new FileOutputStream(out)) { + IOUtils.copy(stream, os); + } + } + } + + final String hadoopRoot = "hadoop-" + hadoopVersion; + final File[] files = hadoopHome.listFiles(new FileFilter() { + @Override + public boolean accept(final File pathname) { + return pathname.isDirectory() && pathname.getName().equals(hadoopRoot); + } + }); + if (files == null || files.length != 1) { + throw new IllegalStateException("Didn't find hadoop in " + hadoopHome); + } + System.setProperty("hadoop.home.dir", files[0].getAbsolutePath()); + */ + + System.out.println("You are on windows (sorry) and you don't set " + + "-Dhadoop.home.dir so we'll download winutils.exe"); + + new File(hadoopHome, "bin").mkdirs(); + final URL url; + try { + url = new URL("https://github.com/steveloughran/winutils/" + + "raw/master/hadoop-2.7.1/bin/winutils.exe"); + } catch (final MalformedURLException e) { // unlikely + throw new IllegalArgumentException(e); + } + try { + try (final InputStream is = url.openStream(); + final OutputStream os = new FileOutputStream( + new File(hadoopHome, "bin/winutils.exe"))) { + try { + IOUtils.copy(is, os, 1024 * 1024); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + } catch (final IOException e) { + throw new IllegalStateException(e); + } + System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath()); + } + + /** + * Just a convenient win(File) invocation for tests. + */ + public static void winTests() { + win(new File("target/hadoop-win")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 9c30792..f8e3c14 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -83,5 +83,14 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <!-- see HDFSFileSourceTest commented block + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.9</version> + <scope>test</scope> + </dependency> + --> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java index 67df7bc..2ce1af7 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -28,9 +28,9 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.HadoopWorkarounds; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.values.KV; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -38,6 +38,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -53,138 +54,143 @@ import java.util.Random; */ public class HDFSFileSourceTest { - Random random = new Random(0L); - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testFullyReadSingleFile() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); - File file = createFileWithData("tmp.seq", expectedResults); - - HDFSFileSource<IntWritable, Text> source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - assertEquals(file.length(), source.getEstimatedSizeBytes(null)); - - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testFullyReadFilePattern() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HDFSFileSource<IntWritable, Text> source = - HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - List<KV<IntWritable, Text>> expectedResults = new ArrayList<>(); - expectedResults.addAll(data1); - expectedResults.addAll(data2); - expectedResults.addAll(data3); - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testCloseUnstartedFilePatternReader() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HDFSFileSource<IntWritable, Text> source = - HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options); - // Closing an unstarted FilePatternReader should not throw an exception. - try { - reader.close(); - } catch (Exception e) { - fail("Closing an unstarted FilePatternReader should not throw an exception"); + Random random = new Random(0L); + + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUpOnWinWithMissingHadoopHome() throws IOException { + HadoopWorkarounds.winTests(); + } + + @Test + public void testFullyReadSingleFile() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); + File file = createFileWithData("tmp.seq", expectedResults); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + assertEquals(file.length(), source.getEstimatedSizeBytes(null)); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); } - } - - @Test - public void testSplits() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); - File file = createFileWithData("tmp.avro", expectedResults); - - HDFSFileSource<IntWritable, Text> source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - // Assert that the source produces the expected records - assertEquals(expectedResults, readFromSource(source, options)); - - // Split with a small bundle size (has to be at least size of sync interval) - List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source - .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); - assertTrue(splits.size() > 2); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - int nonEmptySplits = 0; - for (BoundedSource<KV<IntWritable, Text>> subSource : splits) { - if (readFromSource(subSource, options).size() > 0) { - nonEmptySplits += 1; - } + + @Test + public void testFullyReadFilePattern() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + List<KV<IntWritable, Text>> expectedResults = new ArrayList<>(); + expectedResults.addAll(data1); + expectedResults.addAll(data2); + expectedResults.addAll(data3); + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testCloseUnstartedFilePatternReader() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options); + // Closing an unstarted FilePatternReader should not throw an exception. + try { + reader.close(); + } catch (Exception e) { + fail("Closing an unstarted FilePatternReader should not throw an exception"); + } + } + + @Test + public void testSplits() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + // Assert that the source produces the expected records + assertEquals(expectedResults, readFromSource(source, options)); + + // Split with a small bundle size (has to be at least size of sync interval) + List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source + .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); + assertTrue(splits.size() > 2); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + int nonEmptySplits = 0; + for (BoundedSource<KV<IntWritable, Text>> subSource : splits) { + if (readFromSource(subSource, options).size() > 0) { + nonEmptySplits += 1; + } + } + assertTrue(nonEmptySplits > 2); } - assertTrue(nonEmptySplits > 2); - } - - private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) - throws IOException { - File tmpFile = tmpFolder.newFile(filename); - try (Writer writer = SequenceFile.createWriter(new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(tmpFile.toURI())))) { - - for (KV<IntWritable, Text> record : records) { - writer.append(record.getKey(), record.getValue()); - } + + private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) + throws IOException { + File tmpFile = tmpFolder.newFile(filename); + try (Writer writer = SequenceFile.createWriter(new Configuration(), + Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), + Writer.file(new Path(tmpFile.toURI())))) { + + for (KV<IntWritable, Text> record : records) { + writer.append(record.getKey(), record.getValue()); + } + } + return tmpFile; } - return tmpFile; - } - - private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength, - int numItems, int offset) { - List<KV<IntWritable, Text>> records = new ArrayList<>(); - for (int i = 0; i < numItems; i++) { - IntWritable key = new IntWritable(i + offset); - Text value = new Text(createRandomString(dataItemLength)); - records.add(KV.of(key, value)); + + private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength, + int numItems, int offset) { + List<KV<IntWritable, Text>> records = new ArrayList<>(); + for (int i = 0; i < numItems; i++) { + IntWritable key = new IntWritable(i + offset); + Text value = new Text(createRandomString(dataItemLength)); + records.add(KV.of(key, value)); + } + return records; } - return records; - } - - private String createRandomString(int length) { - char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < length; i++) { - builder.append(chars[random.nextInt(chars.length)]); + + private String createRandomString(int length) { + char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < length; i++) { + builder.append(chars[random.nextInt(chars.length)]); + } + return builder.toString(); } - return builder.toString(); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/maven-archetypes/starter/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml index 5b6cb2a..9fb21e9 100644 --- a/sdks/java/maven-archetypes/starter/pom.xml +++ b/sdks/java/maven-archetypes/starter/pom.xml @@ -60,6 +60,9 @@ <goals> <goal>integration-test</goal> </goals> + <configuration> + <ignoreEOLStyle>true</ignoreEOLStyle> <!-- for win --> + </configuration> </execution> </executions> </plugin>
