http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index a6be4fb..9468893 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -42,9 +42,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -71,31 +69,22 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; -import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; @@ -120,10 +109,10 @@ import org.junit.runners.JUnit4; public class TextIOTest { private static final String MY_HEADER = "myHeader"; private static final String MY_FOOTER = "myFooter"; - private static final List<String> EMPTY = Collections.emptyList(); - private static final List<String> TINY = - Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk"); - private static final List<String> LARGE = makeLines(1000); + private static final String[] EMPTY = new String[] {}; + private static final String[] TINY = + new String[] {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + private static final String[] LARGE = makeLines(1000); private static Path tempFolder; private static File emptyTxt; @@ -148,7 +137,7 @@ public class TextIOTest { @Rule public ExpectedException expectedException = ExpectedException.none(); - private static File writeToFile(List<String> lines, String filename, CompressionType compression) + private static File writeToFile(String[] lines, String filename, CompressionType compression) throws IOException { File file = tempFolder.resolve(filename).toFile(); OutputStream output = new FileOutputStream(file); @@ -216,7 +205,7 @@ public class TextIOTest { }); } - private void runTestRead(String[] expected) throws Exception { + private <T> void runTestRead(String[] expected) throws Exception { File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); String filename = tmpFile.getPath(); @@ -285,213 +274,6 @@ public class TextIOTest { displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } - static class TestDynamicDestinations extends DynamicDestinations<String, String> { - ResourceId baseDir; - - TestDynamicDestinations(ResourceId baseDir) { - this.baseDir = baseDir; - } - - @Override - public String getDestination(String element) { - // Destination is based on first character of string. - return element.substring(0, 1); - } - - @Override - public String getDefaultDestination() { - return ""; - } - - @Nullable - @Override - public Coder<String> getDestinationCoder() { - return StringUtf8Coder.of(); - } - - @Override - public FilenamePolicy getFilenamePolicy(String destination) { - return DefaultFilenamePolicy.fromStandardParameters( - StaticValueProvider.of( - baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)), - null, - null, - false); - } - } - - class StartsWith implements Predicate<String> { - String prefix; - - StartsWith(String prefix) { - this.prefix = prefix; - } - - @Override - public boolean apply(@Nullable String input) { - return input.startsWith(prefix); - } - } - - @Test - @Category(NeedsRunner.class) - public void testDynamicDestinations() throws Exception { - ResourceId baseDir = - FileSystems.matchNewResource( - Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true); - - List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab"); - PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of())); - input.apply( - TextIO.write() - .to(new TestDynamicDestinations(baseDir)) - .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); - p.run(); - - assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class), - null, - null, - 0, - baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); - assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class), - null, - null, - 0, - baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); - assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class), - null, - null, - 0, - baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); - } - - @DefaultCoder(AvroCoder.class) - private static class UserWriteType { - String destination; - String metadata; - - UserWriteType() { - this.destination = ""; - this.metadata = ""; - } - - UserWriteType(String destination, String metadata) { - this.destination = destination; - this.metadata = metadata; - } - - @Override - public String toString() { - return String.format("destination: %s metadata : %s", destination, metadata); - } - } - - private static class SerializeUserWrite implements SerializableFunction<UserWriteType, String> { - @Override - public String apply(UserWriteType input) { - return input.toString(); - } - } - - private static class UserWriteDestination implements SerializableFunction<UserWriteType, Params> { - private ResourceId baseDir; - - UserWriteDestination(ResourceId baseDir) { - this.baseDir = baseDir; - } - - @Override - public Params apply(UserWriteType input) { - return new Params() - .withBaseFilename( - baseDir.resolve( - "file_" + input.destination.substring(0, 1) + ".txt", - StandardResolveOptions.RESOLVE_FILE)); - } - } - - private static class ExtractWriteDestination implements Function<UserWriteType, String> { - @Override - public String apply(@Nullable UserWriteType input) { - return input.destination; - } - } - - @Test - @Category(NeedsRunner.class) - public void testDynamicDefaultFilenamePolicy() throws Exception { - ResourceId baseDir = - FileSystems.matchNewResource( - Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true); - - List<UserWriteType> elements = - Lists.newArrayList( - new UserWriteType("aaaa", "first"), - new UserWriteType("aaab", "second"), - new UserWriteType("baaa", "third"), - new UserWriteType("baab", "fourth"), - new UserWriteType("caaa", "fifth"), - new UserWriteType("caab", "sixth")); - PCollection<UserWriteType> input = p.apply(Create.of(elements)); - input.apply( - TextIO.writeCustomType(new SerializeUserWrite()) - .to(new UserWriteDestination(baseDir), new Params()) - .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); - p.run(); - - String[] aElements = - Iterables.toArray( - Iterables.transform( - Iterables.filter( - elements, - Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())), - Functions.toStringFunction()), - String.class); - String[] bElements = - Iterables.toArray( - Iterables.transform( - Iterables.filter( - elements, - Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())), - Functions.toStringFunction()), - String.class); - String[] cElements = - Iterables.toArray( - Iterables.transform( - Iterables.filter( - elements, - Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())), - Functions.toStringFunction()), - String.class); - assertOutputFiles( - aElements, - null, - null, - 0, - baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); - assertOutputFiles( - bElements, - null, - null, - 0, - baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); - assertOutputFiles( - cElements, - null, - null, - 0, - baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); - } - private void runTestWrite(String[] elems) throws Exception { runTestWrite(elems, null, null, 1); } @@ -509,8 +291,7 @@ public class TextIOTest { String[] elems, String header, String footer, int numShards) throws Exception { String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); - ResourceId baseFilename = - FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); + String baseFilename = baseDir.resolve(outputName).toString(); PCollection<String> input = p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of())); @@ -530,14 +311,8 @@ public class TextIOTest { p.run(); - assertOutputFiles( - elems, - header, - footer, - numShards, - baseFilename, - firstNonNull( - write.inner.getShardTemplate(), + assertOutputFiles(elems, header, footer, numShards, baseDir, outputName, + firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE)); } @@ -546,12 +321,13 @@ public class TextIOTest { final String header, final String footer, int numShards, - ResourceId outputPrefix, + Path rootLocation, + String outputName, String shardNameTemplate) throws Exception { List<File> expectedFiles = new ArrayList<>(); if (numShards == 0) { - String pattern = outputPrefix.toString() + "*"; + String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString(); List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern)); for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) { expectedFiles.add(new File(expectedFile.resourceId().toString())); @@ -560,9 +336,9 @@ public class TextIOTest { for (int i = 0; i < numShards; i++) { expectedFiles.add( new File( + rootLocation.toString(), DefaultFilenamePolicy.constructName( - outputPrefix, shardNameTemplate, "", i, numShards, null, null) - .toString())); + outputName, shardNameTemplate, "", i, numShards, null, null))); } } @@ -680,19 +456,14 @@ public class TextIOTest { public void testWriteWithWritableByteChannelFactory() throws Exception { Coder<String> coder = StringUtf8Coder.of(); String outputName = "file.txt"; - ResourceId baseDir = - FileSystems.matchNewResource( - Files.createTempDirectory(tempFolder, "testwrite").toString(), true); + Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder)); final WritableByteChannelFactory writableByteChannelFactory = new DrunkWritableByteChannelFactory(); - TextIO.Write write = - TextIO.write() - .to(baseDir.resolve(outputName, StandardResolveOptions.RESOLVE_FILE).toString()) - .withoutSharding() - .withWritableByteChannelFactory(writableByteChannelFactory); + TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString()) + .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK")); @@ -705,15 +476,8 @@ public class TextIOTest { drunkElems.add(elem); drunkElems.add(elem); } - assertOutputFiles( - drunkElems.toArray(new String[0]), - null, - null, - 1, - baseDir.resolve( - outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(), - StandardResolveOptions.RESOLVE_FILE), - write.inner.getShardTemplate()); + assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir, + outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardTemplate()); } @Test @@ -791,7 +555,7 @@ public class TextIOTest { * Helper that writes the given lines (adding a newline in between) to a stream, then closes the * stream. */ - private static void writeToStreamAndClose(List<String> lines, OutputStream outputStream) { + private static void writeToStreamAndClose(String[] lines, OutputStream outputStream) { try (PrintStream writer = new PrintStream(outputStream)) { for (String line : lines) { writer.println(line); @@ -800,33 +564,27 @@ public class TextIOTest { } /** - * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) and - * TextIO.readAll().withCompressionType(compressionType) applied to the single filename, + * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) * and asserts that the results match the given expected output. */ private void assertReadingCompressedFileMatchesExpected( - File file, CompressionType compressionType, List<String> expected) { - - TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType); - PAssert.that(p.apply("Read_" + file + "_" + compressionType.toString(), read)) - .containsInAnyOrder(expected); - - TextIO.ReadAll readAll = - TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10); - PAssert.that( - p.apply("Create_" + file, Create.of(file.getPath())) - .apply("Read_" + compressionType.toString(), readAll)) - .containsInAnyOrder(expected); + File file, CompressionType compressionType, String[] expected) { + + TextIO.Read read = + TextIO.read().from(file.getPath()).withCompressionType(compressionType); + PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read); + + PAssert.that(output).containsInAnyOrder(expected); p.run(); } /** * Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ - private static List<String> makeLines(int n) { - List<String> ret = new ArrayList<>(); + private static String[] makeLines(int n) { + String[] ret = new String[n]; for (int i = 0; i < n; ++i) { - ret.add("word" + i); + ret[i] = "word" + i; } return ret; } @@ -1010,7 +768,7 @@ public class TextIOTest { String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2); assertReadingCompressedFileMatchesExpected( - new File(filename), CompressionType.ZIP, expected); + new File(filename), CompressionType.ZIP, expected.toArray(new String[]{})); } /** @@ -1029,7 +787,7 @@ public class TextIOTest { new String[]{"dog"}); assertReadingCompressedFileMatchesExpected( - new File(filename), CompressionType.ZIP, Arrays.asList("cat", "dog")); + new File(filename), CompressionType.ZIP, new String[] {"cat", "dog"}); } @Test @@ -1346,21 +1104,5 @@ public class TextIOTest { SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } - - @Test - @Category(NeedsRunner.class) - public void testReadAll() throws IOException { - writeToFile(TINY, "readAllTiny1.zip", ZIP); - writeToFile(TINY, "readAllTiny2.zip", ZIP); - writeToFile(LARGE, "readAllLarge1.zip", ZIP); - writeToFile(LARGE, "readAllLarge2.zip", ZIP); - PCollection<String> lines = - p.apply( - Create.of( - tempFolder.resolve("readAllTiny*").toString(), - tempFolder.resolve("readAllLarge*").toString())) - .apply(TextIO.readAll().withCompressionType(AUTO)); - PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); - p.run(); - } } +
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 55f2a87..a5dacd1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -42,11 +41,6 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink.CompressionType; -import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.SimpleSink.SimpleWriter; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -63,24 +57,17 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -173,11 +160,7 @@ public class WriteFilesTest { public void testWrite() throws IOException { List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite( - inputs, - IDENTITY_MAP, - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename()); } /** @@ -186,11 +169,7 @@ public class WriteFilesTest { @Test @Category(NeedsRunner.class) public void testEmptyWrite() throws IOException { - runWrite( - Collections.<String>emptyList(), - IDENTITY_MAP, - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename()); checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1)); } @@ -206,7 +185,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()).withNumShards(1)); + Optional.of(1)); } private ResourceId getBaseOutputDirectory() { @@ -214,13 +193,8 @@ public class WriteFilesTest { .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY); } - - private SimpleSink<Void> makeSimpleSink() { - FilenamePolicy filenamePolicy = - new PerWindowFiles( - getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE), - "simple"); - return SimpleSink.makeSimpleSink(getBaseOutputDirectory(), filenamePolicy); + private SimpleSink makeSimpleSink() { + return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple"); } @Test @@ -239,10 +213,8 @@ public class WriteFilesTest { timestamps.add(i + 1); } - SimpleSink<Void> sink = makeSimpleSink(); - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()) - .withSharding(new LargestInt()); + SimpleSink sink = makeSimpleSink(); + WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt()); p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(IDENTITY_MAP) .apply(write); @@ -263,8 +235,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()) - .withNumShards(20)); + Optional.of(20)); } /** @@ -274,11 +245,7 @@ public class WriteFilesTest { @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws IOException { List<String> inputs = new ArrayList<>(); - runWrite( - inputs, - IDENTITY_MAP, - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename()); } /** @@ -290,10 +257,8 @@ public class WriteFilesTest { List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); runWrite( - inputs, - new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), + getBaseOutputFilename()); } /** @@ -307,32 +272,16 @@ public class WriteFilesTest { runWrite( inputs, - new WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + new WindowAndReshuffle<>( + Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), + getBaseOutputFilename()); } @Test - @Category(NeedsRunner.class) - public void testWriteSpilling() throws IOException { - List<String> inputs = Lists.newArrayList(); - for (int i = 0; i < 100; ++i) { - inputs.add("mambo_number_" + i); - } - runWrite( - inputs, - Window.<String>into(FixedWindows.of(Duration.millis(2))), - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()) - .withMaxNumWritersPerBundle(2) - .withWindowedWrites()); - } - public void testBuildWrite() { - SimpleSink<Void> sink = makeSimpleSink(); - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(3); - assertThat((SimpleSink<Void>) write.getSink(), is(sink)); + SimpleSink sink = makeSimpleSink(); + WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3); + assertThat((SimpleSink) write.getSink(), is(sink)); PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = write.getSharding(); @@ -341,37 +290,25 @@ public class WriteFilesTest { assertThat(write.getNumShards().get(), equalTo(3)); assertThat(write.getSharding(), equalTo(originalSharding)); - WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM); - assertThat((SimpleSink<Void>) write2.getSink(), is(sink)); + WriteFiles<String> write2 = write.withSharding(SHARDING_TRANSFORM); + assertThat((SimpleSink) write2.getSink(), is(sink)); assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM)); // original unchanged - WriteFiles<String, ?, ?> writeUnsharded = write2.withRunnerDeterminedSharding(); + WriteFiles<String> writeUnsharded = write2.withRunnerDeterminedSharding(); assertThat(writeUnsharded.getSharding(), nullValue()); assertThat(write.getSharding(), equalTo(originalSharding)); } @Test public void testDisplayData() { - DynamicDestinations<String, Void> dynamicDestinations = - DynamicFileDestinations.constant( - DefaultFilenamePolicy.fromParams( - new Params() - .withBaseFilename( - getBaseOutputDirectory() - .resolve("file", StandardResolveOptions.RESOLVE_FILE)) - .withShardTemplate("-SS-of-NN"))); - SimpleSink<Void> sink = - new SimpleSink<Void>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()); - + SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String> write = WriteFiles.to(sink); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); @@ -379,145 +316,14 @@ public class WriteFilesTest { } @Test - @Category(NeedsRunner.class) - public void testUnboundedNeedsWindowed() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Must use windowed writes when applying WriteFiles to an unbounded PCollection"); - - SimpleSink<Void> sink = makeSimpleSink(); - p.apply(Create.of("foo")) - .setIsBoundedInternal(IsBounded.UNBOUNDED) - .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity())); - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testUnboundedNeedsSharding() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "When applying WriteFiles to an unbounded PCollection, " - + "must specify number of output shards explicitly"); - - SimpleSink<Void> sink = makeSimpleSink(); - p.apply(Create.of("foo")) - .setIsBoundedInternal(IsBounded.UNBOUNDED) - .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()).withWindowedWrites()); - p.run(); - } - - // Test DynamicDestinations class. Expects user values to be string-encoded integers. - // Stores the integer mod 5 as the destination, and uses that in the file prefix. - static class TestDestinations extends DynamicDestinations<String, Integer> { - private ResourceId baseOutputDirectory; - - TestDestinations(ResourceId baseOutputDirectory) { - this.baseOutputDirectory = baseOutputDirectory; - } - - @Override - public Integer getDestination(String element) { - return Integer.valueOf(element) % 5; - } - - @Override - public Integer getDefaultDestination() { - return 0; - } - - @Override - public FilenamePolicy getFilenamePolicy(Integer destination) { - return new PerWindowFiles( - baseOutputDirectory.resolve("file_" + destination, StandardResolveOptions.RESOLVE_FILE), - "simple"); - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - } - } - - // Test format function. Prepend a string to each record before writing. - static class TestDynamicFormatFunction implements SerializableFunction<String, String> { - @Override - public String apply(String input) { - return "record_" + input; - } - } - - @Test - @Category(NeedsRunner.class) - public void testDynamicDestinationsBounded() throws Exception { - testDynamicDestinationsHelper(true); - } - - @Test - @Category(NeedsRunner.class) - public void testDynamicDestinationsUnbounded() throws Exception { - testDynamicDestinationsHelper(false); - } - - private void testDynamicDestinationsHelper(boolean bounded) throws IOException { - TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory()); - SimpleSink<Integer> sink = - new SimpleSink<>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED); - - // Flag to validate that the pipeline options are passed to the Sink. - WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); - options.setTestFlag("test_value"); - Pipeline p = TestPipeline.create(options); - - List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); - // Prepare timestamps for the elements. - List<Long> timestamps = new ArrayList<>(); - for (long i = 0; i < inputs.size(); i++) { - timestamps.add(i + 1); - } - - WriteFiles<String, Integer, String> writeFiles = - WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1); - - PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps)); - if (!bounded) { - input.setIsBoundedInternal(IsBounded.UNBOUNDED); - input = input.apply(Window.<String>into(FixedWindows.of(Duration.standardDays(1)))); - input.apply(writeFiles.withWindowedWrites()); - } else { - input.apply(writeFiles); - } - p.run(); - - for (int i = 0; i < 5; ++i) { - ResourceId base = - getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE); - List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i + 5)); - checkFileContents(base.toString(), expected, Optional.of(1)); - } - } - - @Test public void testShardedDisplayData() { - DynamicDestinations<String, Void> dynamicDestinations = - DynamicFileDestinations.constant( - DefaultFilenamePolicy.fromParams( - new Params() - .withBaseFilename( - getBaseOutputDirectory() - .resolve("file", StandardResolveOptions.RESOLVE_FILE)) - .withShardTemplate("-SS-of-NN"))); - SimpleSink<Void> sink = - new SimpleSink<Void>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(1); + SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String> write = WriteFiles.to(sink).withNumShards(1); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); assertThat(displayData, includesDisplayDataFor("sink", sink)); @@ -526,24 +332,14 @@ public class WriteFilesTest { @Test public void testCustomShardStrategyDisplayData() { - DynamicDestinations<String, Void> dynamicDestinations = - DynamicFileDestinations.constant( - DefaultFilenamePolicy.fromParams( - new Params() - .withBaseFilename( - getBaseOutputDirectory() - .resolve("file", StandardResolveOptions.RESOLVE_FILE)) - .withShardTemplate("-SS-of-NN"))); - SimpleSink<Void> sink = - new SimpleSink<Void>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()) + SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String> write = + WriteFiles.to(sink) .withSharding( new PTransform<PCollection<String>, PCollectionView<Integer>>() { @Override @@ -568,77 +364,22 @@ public class WriteFilesTest { * PCollection are written to the sink. */ private void runWrite( - List<String> inputs, - PTransform<PCollection<String>, PCollection<String>> transform, - String baseName, - WriteFiles<String, ?, String> write) - throws IOException { - runShardedWrite(inputs, transform, baseName, write); - } - - private static class PerWindowFiles extends FilenamePolicy { - private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis(); - private final ResourceId baseFilename; - private final String suffix; - - public PerWindowFiles(ResourceId baseFilename, String suffix) { - this.baseFilename = baseFilename; - this.suffix = suffix; - } - - public String filenamePrefixForWindow(IntervalWindow window) { - String prefix = - baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); - return String.format("%s%s-%s", - prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); - } - - @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); - String filename = - String.format( - "%s-%s-of-%s%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), - outputFileHints.getSuggestedFilenameSuffix(), - suffix); - return baseFilename - .getCurrentDirectory() - .resolve(filename, StandardResolveOptions.RESOLVE_FILE); - } - - @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { - String prefix = - baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); - String filename = - String.format( - "%s-%s-of-%s%s%s", - prefix, - context.getShardNumber(), - context.getNumShards(), - outputFileHints.getSuggestedFilenameSuffix(), - suffix); - return baseFilename - .getCurrentDirectory() - .resolve(filename, StandardResolveOptions.RESOLVE_FILE); - } + List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, + String baseName) throws IOException { + runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent()); } /** * Performs a WriteFiles transform with the desired number of shards. Verifies the WriteFiles * transform calls the appropriate methods on a test sink in the correct order, as well as - * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards is - * not null, also verifies that the output number of shards is correct. + * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards + * is not null, also verifies that the output number of shards is correct. */ private void runShardedWrite( List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName, - WriteFiles<String, ?, String> write) - throws IOException { + Optional<Integer> numConfiguredShards) throws IOException { // Flag to validate that the pipeline options are passed to the Sink WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); options.setTestFlag("test_value"); @@ -649,15 +390,18 @@ public class WriteFilesTest { for (long i = 0; i < inputs.size(); i++) { timestamps.add(i + 1); } + + SimpleSink sink = makeSimpleSink(); + WriteFiles<String> write = WriteFiles.to(sink); + if (numConfiguredShards.isPresent()) { + write = write.withNumShards(numConfiguredShards.get()); + } p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(transform) .apply(write); p.run(); - Optional<Integer> numShards = - (write.getNumShards() != null) - ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent(); - checkFileContents(baseName, inputs, numShards); + checkFileContents(baseName, inputs, numConfiguredShards); } static void checkFileContents(String baseName, List<String> inputs, http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java index f8cd00f..120d5ed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java @@ -60,18 +60,6 @@ public class PipelineOptionsValidatorTest { } @Test - public void testWhenRequiredOptionIsSetAndClearedCli() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Missing required value for " - + "[--object, \"Fake Description\"]."); - - Required required = PipelineOptionsFactory.fromArgs(new String[]{"--object=blah"}) - .as(Required.class); - required.setObject(null); - PipelineOptionsValidator.validateCli(Required.class, required); - } - - @Test public void testWhenRequiredOptionIsNeverSet() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Missing required value for " @@ -82,17 +70,6 @@ public class PipelineOptionsValidatorTest { PipelineOptionsValidator.validate(Required.class, required); } - - @Test - public void testWhenRequiredOptionIsNeverSetCli() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Missing required value for " - + "[--object, \"Fake Description\"]."); - - Required required = PipelineOptionsFactory.fromArgs(new String[]{}).as(Required.class); - PipelineOptionsValidator.validateCli(Required.class, required); - } - @Test public void testWhenRequiredOptionIsNeverSetOnSuperInterface() { expectedException.expect(IllegalArgumentException.class); @@ -104,16 +81,6 @@ public class PipelineOptionsValidatorTest { PipelineOptionsValidator.validate(Required.class, options); } - @Test - public void testWhenRequiredOptionIsNeverSetOnSuperInterfaceCli() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Missing required value for " - + "[--object, \"Fake Description\"]."); - - PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[]{}).create(); - PipelineOptionsValidator.validateCli(Required.class, options); - } - /** A test interface that overrides the parent's method. */ public interface SubClassValidation extends Required { @Override @@ -133,17 +100,6 @@ public class PipelineOptionsValidatorTest { PipelineOptionsValidator.validate(Required.class, required); } - @Test - public void testValidationOnOverriddenMethodsCli() throws Exception { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Missing required value for " - + "[--object, \"Fake Description\"]."); - - SubClassValidation required = PipelineOptionsFactory.fromArgs(new String[]{}) - .as(SubClassValidation.class); - PipelineOptionsValidator.validateCli(Required.class, required); - } - /** A test interface with a required group. */ public interface GroupRequired extends PipelineOptions { @Validation.Required(groups = {"ham"}) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index fb0a0d7..2c43f57 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -44,7 +44,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.testing.EqualsTester; import java.io.IOException; -import java.io.NotSerializableException; import java.io.Serializable; import java.util.HashSet; import java.util.List; @@ -55,7 +54,6 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -1021,21 +1019,4 @@ public class ProxyInvocationHandlerTest { DisplayData data = DisplayData.from(options); assertThat(data, not(hasDisplayItem("value"))); } - - private static class CapturesOptions implements Serializable { - PipelineOptions options = PipelineOptionsFactory.create(); - } - - @Test - public void testOptionsAreNotSerializable() { - expectedException.expectCause(Matchers.<Throwable>instanceOf(NotSerializableException.class)); - SerializableUtils.clone(new CapturesOptions()); - } - - @Test - public void testGetOptionNameFromMethod() throws NoSuchMethodException { - ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, Object>newHashMap()); - handler.as(BaseOptions.class); - assertEquals("foo", handler.getOptionName(BaseOptions.class.getMethod("getFoo"))); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 93650dd..1197d1b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.runners; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; @@ -33,8 +32,6 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; @@ -495,198 +492,4 @@ public class TransformHierarchyTest implements Serializable { assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode)); assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output)); } - - @Test - public void visitIsTopologicallyOrdered() { - PCollection<String> one = - PCollection.<String>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) - .setCoder(StringUtf8Coder.of()); - final PCollection<Integer> two = - PCollection.<Integer>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) - .setCoder(VarIntCoder.of()); - final PDone done = PDone.in(pipeline); - final TupleTag<String> oneTag = new TupleTag<String>() {}; - final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; - final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); - - PTransform<PCollection<String>, PDone> multiConsumer = - new PTransform<PCollection<String>, PDone>() { - @Override - public PDone expand(PCollection<String> input) { - return done; - } - - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two); - } - }; - hierarchy.pushNode("consumes_both", one, multiConsumer); - hierarchy.setOutput(done); - hierarchy.popNode(); - - final PTransform<PBegin, PCollectionTuple> producer = - new PTransform<PBegin, PCollectionTuple>() { - @Override - public PCollectionTuple expand(PBegin input) { - return oneAndTwo; - } - }; - hierarchy.pushNode( - "encloses_producer", - PBegin.in(pipeline), - new PTransform<PBegin, PCollectionTuple>() { - @Override - public PCollectionTuple expand(PBegin input) { - return input.apply(producer); - } - }); - hierarchy.pushNode( - "creates_one_and_two", - PBegin.in(pipeline), producer); - hierarchy.setOutput(oneAndTwo); - hierarchy.popNode(); - hierarchy.setOutput(oneAndTwo); - hierarchy.popNode(); - - hierarchy.pushNode("second_copy_of_consumes_both", one, multiConsumer); - hierarchy.setOutput(done); - hierarchy.popNode(); - - final Set<Node> visitedNodes = new HashSet<>(); - final Set<Node> exitedNodes = new HashSet<>(); - final Set<PValue> visitedValues = new HashSet<>(); - hierarchy.visit( - new PipelineVisitor.Defaults() { - - @Override - public CompositeBehavior enterCompositeTransform(Node node) { - for (PValue input : node.getInputs().values()) { - assertThat(visitedValues, hasItem(input)); - } - assertThat( - "Nodes should not be visited more than once", visitedNodes, not(hasItem(node))); - if (!node.isRootNode()) { - assertThat( - "Nodes should always be visited after their enclosing nodes", - visitedNodes, - hasItem(node.getEnclosingNode())); - } - visitedNodes.add(node); - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(Node node) { - assertThat(visitedNodes, hasItem(node)); - if (!node.isRootNode()) { - assertThat( - "Nodes should always be left before their enclosing nodes are left", - exitedNodes, - not(hasItem(node.getEnclosingNode()))); - } - assertThat(exitedNodes, not(hasItem(node))); - exitedNodes.add(node); - } - - @Override - public void visitPrimitiveTransform(Node node) { - assertThat(visitedNodes, hasItem(node.getEnclosingNode())); - assertThat(exitedNodes, not(hasItem(node.getEnclosingNode()))); - assertThat( - "Nodes should not be visited more than once", visitedNodes, not(hasItem(node))); - for (PValue input : node.getInputs().values()) { - assertThat(visitedValues, hasItem(input)); - } - visitedNodes.add(node); - } - - @Override - public void visitValue(PValue value, Node producer) { - assertThat(visitedNodes, hasItem(producer)); - assertThat(visitedValues, not(hasItem(value))); - visitedValues.add(value); - } - }); - assertThat("Should have visited all the nodes", visitedNodes.size(), equalTo(5)); - assertThat("Should have left all of the visited composites", exitedNodes.size(), equalTo(2)); - } - - @Test - public void visitDoesNotVisitSkippedNodes() { - PCollection<String> one = - PCollection.<String>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) - .setCoder(StringUtf8Coder.of()); - final PCollection<Integer> two = - PCollection.<Integer>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) - .setCoder(VarIntCoder.of()); - final PDone done = PDone.in(pipeline); - final TupleTag<String> oneTag = new TupleTag<String>() {}; - final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; - final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); - - hierarchy.pushNode( - "consumes_both", - one, - new PTransform<PCollection<String>, PDone>() { - @Override - public PDone expand(PCollection<String> input) { - return done; - } - - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two); - } - }); - hierarchy.setOutput(done); - hierarchy.popNode(); - - final PTransform<PBegin, PCollectionTuple> producer = - new PTransform<PBegin, PCollectionTuple>() { - @Override - public PCollectionTuple expand(PBegin input) { - return oneAndTwo; - } - }; - final Node enclosing = - hierarchy.pushNode( - "encloses_producer", - PBegin.in(pipeline), - new PTransform<PBegin, PCollectionTuple>() { - @Override - public PCollectionTuple expand(PBegin input) { - return input.apply(producer); - } - }); - Node enclosed = hierarchy.pushNode("creates_one_and_two", PBegin.in(pipeline), producer); - hierarchy.setOutput(oneAndTwo); - hierarchy.popNode(); - hierarchy.setOutput(oneAndTwo); - hierarchy.popNode(); - - final Set<Node> visitedNodes = new HashSet<>(); - hierarchy.visit( - new PipelineVisitor.Defaults() { - @Override - public CompositeBehavior enterCompositeTransform(Node node) { - visitedNodes.add(node); - return node.equals(enclosing) - ? CompositeBehavior.DO_NOT_ENTER_TRANSFORM - : CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void visitPrimitiveTransform(Node node) { - visitedNodes.add(node); - } - }); - - assertThat(visitedNodes, hasItem(enclosing)); - assertThat(visitedNodes, not(hasItem(enclosed))); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index aaf8b91..adf27f8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -22,9 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -39,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -352,10 +349,5 @@ public final class PCollectionViewTesting { .add("viewFn", viewFn) .toString(); } - - @Override - public Map<TupleTag<?>, PValue> expand() { - return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection); - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index b24d82d..dc9788f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.TestUtils.checkCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; @@ -29,13 +29,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -86,6 +85,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; /** * Tests for Combine transforms. @@ -95,8 +95,18 @@ public class CombineTest implements Serializable { // This test is Serializable, just so that it's easy to have // anonymous inner classes inside the non-static test methods. + static final List<KV<String, Integer>> TABLE = Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + ); + static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList(); + @Mock private DoFn<?, ?>.ProcessContext processContext; + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -140,12 +150,12 @@ public class CombineTest implements Serializable { PCollection<KV<String, String>> combinePerKey = perKeyInput.apply( Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(globallySumView)); + .withSideInputs(Arrays.asList(globallySumView))); PCollection<String> combineGlobally = globallyInput .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() - .withSideInputs(globallySumView)); + .withSideInputs(Arrays.asList(globallySumView))); PAssert.that(sum).containsInAnyOrder(globalSum); PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines); @@ -158,28 +168,16 @@ public class CombineTest implements Serializable { @Category(ValidatesRunner.class) @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombine() { - runTestSimpleCombine(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113"))); + runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113"))); } @Test @Category(ValidatesRunner.class) @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineWithContext() { - runTestSimpleCombineWithContext(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), 20, - Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")), - new String[] {"20:111134"}); + runTestSimpleCombineWithContext(TABLE, 20, + Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")), + new String[] {"01111234"}); } @Test @@ -218,13 +216,7 @@ public class CombineTest implements Serializable { @Test @Category(ValidatesRunner.class) public void testBasicCombine() { - runTestBasicCombine(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), ImmutableSet.of(1, 13, 4), Arrays.asList( + runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList( KV.of("a", (Set<Integer>) ImmutableSet.of(1, 4)), KV.of("b", (Set<Integer>) ImmutableSet.of(1, 13)))); } @@ -259,16 +251,9 @@ public class CombineTest implements Serializable { @Category(ValidatesRunner.class) public void testFixedWindowsCombine() { PCollection<KV<String, Integer>> input = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(1L)), - TimestampedValue.of(KV.of("a", 4), new Instant(6L)), - TimestampedValue.of(KV.of("b", 1), new Instant(7L)), - TimestampedValue.of(KV.of("b", 13), new Instant(8L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2)))); + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2)))); PCollection<Integer> sum = input .apply(Values.<Integer>create()) @@ -278,9 +263,11 @@ public class CombineTest implements Serializable { .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(2, 5, 13); - PAssert.that(sumPerKey) - .containsInAnyOrder( - Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13"))); + PAssert.that(sumPerKey).containsInAnyOrder( + KV.of("a", "11"), + KV.of("a", "4"), + KV.of("b", "1"), + KV.of("b", "13")); pipeline.run(); } @@ -288,16 +275,9 @@ public class CombineTest implements Serializable { @Category(ValidatesRunner.class) public void testFixedWindowsCombineWithContext() { PCollection<KV<String, Integer>> perKeyInput = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(1L)), - TimestampedValue.of(KV.of("a", 4), new Instant(6L)), - TimestampedValue.of(KV.of("b", 1), new Instant(7L)), - TimestampedValue.of(KV.of("b", 13), new Instant(8L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2)))); + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2)))); PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create()); @@ -309,129 +289,60 @@ public class CombineTest implements Serializable { PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput.apply( Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(globallySumView)); + .withSideInputs(Arrays.asList(globallySumView))); PCollection<String> combineGloballyWithContext = globallyInput .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() - .withSideInputs(globallySumView)); + .withSideInputs(Arrays.asList(globallySumView))); PAssert.that(sum).containsInAnyOrder(2, 5, 13); - PAssert.that(combinePerKeyWithContext) - .containsInAnyOrder( - Arrays.asList( - KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13"))); - PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13"); - pipeline.run(); - } - - @Test - @Category(ValidatesRunner.class) - public void testSlidingWindowsCombine() { - PCollection<String> input = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of("a", new Instant(1L)), - TimestampedValue.of("b", new Instant(2L)), - TimestampedValue.of("c", new Instant(3L)))) - .apply( - Window.<String>into( - SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L)))); - PCollection<List<String>> combined = - input.apply( - Combine.globally( - new CombineFn<String, List<String>, List<String>>() { - @Override - public List<String> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<String> addInput(List<String> accumulator, String input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<String> mergeAccumulators(Iterable<List<String>> accumulators) { - // Mutate all of the accumulators. Instances should be used in only one - // place, and not - // reused after merging. - List<String> cur = createAccumulator(); - for (List<String> accumulator : accumulators) { - accumulator.addAll(cur); - cur = accumulator; - } - return cur; - } - - @Override - public List<String> extractOutput(List<String> accumulator) { - List<String> result = new ArrayList<>(accumulator); - Collections.sort(result); - return result; - } - }) - .withoutDefaults()); - - PAssert.that(combined) - .containsInAnyOrder( - ImmutableList.of("a"), - ImmutableList.of("a", "b"), - ImmutableList.of("a", "b", "c"), - ImmutableList.of("b", "c"), - ImmutableList.of("c")); - + PAssert.that(combinePerKeyWithContext).containsInAnyOrder( + KV.of("a", "112"), + KV.of("a", "45"), + KV.of("b", "15"), + KV.of("b", "1133")); + PAssert.that(combineGloballyWithContext).containsInAnyOrder("112", "145", "1133"); pipeline.run(); } @Test @Category(ValidatesRunner.class) public void testSlidingWindowsCombineWithContext() { - // [a: 1, 1], [a: 4; b: 1], [b: 13] PCollection<KV<String, Integer>> perKeyInput = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(2L)), - TimestampedValue.of(KV.of("a", 1), new Instant(3L)), - TimestampedValue.of(KV.of("a", 4), new Instant(8L)), - TimestampedValue.of(KV.of("b", 1), new Instant(9L)), - TimestampedValue.of(KV.of("b", 13), new Instant(10L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.millis(2)))); + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 10L)) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.millis(2)))); PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create()); - PCollection<Integer> sum = globallyInput.apply("Sum", Sum.integersGlobally().withoutDefaults()); + PCollection<Integer> sum = globallyInput + .apply("Sum", Combine.globally(new SumInts()).withoutDefaults()); PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton()); PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput.apply( Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(globallySumView)); + .withSideInputs(Arrays.asList(globallySumView))); PCollection<String> combineGloballyWithContext = globallyInput .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() - .withSideInputs(globallySumView)); + .withSideInputs(Arrays.asList(globallySumView))); PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13); - PAssert.that(combinePerKeyWithContext) - .containsInAnyOrder( - Arrays.asList( - KV.of("a", "1:1"), - KV.of("a", "2:11"), - KV.of("a", "1:1"), - KV.of("a", "4:4"), - KV.of("a", "5:4"), - KV.of("b", "5:1"), - KV.of("b", "14:113"), - KV.of("b", "13:13"))); + PAssert.that(combinePerKeyWithContext).containsInAnyOrder( + KV.of("a", "11"), + KV.of("a", "112"), + KV.of("a", "11"), + KV.of("a", "44"), + KV.of("a", "45"), + KV.of("b", "15"), + KV.of("b", "11134"), + KV.of("b", "1133")); PAssert.that(combineGloballyWithContext).containsInAnyOrder( - "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13"); + "11", "112", "11", "44", "145", "11134", "1133"); pipeline.run(); } @@ -472,16 +383,9 @@ public class CombineTest implements Serializable { @Category(ValidatesRunner.class) public void testSessionsCombine() { PCollection<KV<String, Integer>> input = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(4L)), - TimestampedValue.of(KV.of("a", 4), new Instant(7L)), - TimestampedValue.of(KV.of("b", 1), new Instant(10L)), - TimestampedValue.of(KV.of("b", 13), new Instant(16L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5)))); + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5)))); PCollection<Integer> sum = input .apply(Values.<Integer>create()) @@ -491,8 +395,10 @@ public class CombineTest implements Serializable { .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(7, 13); - PAssert.that(sumPerKey) - .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13"))); + PAssert.that(sumPerKey).containsInAnyOrder( + KV.of("a", "114"), + KV.of("b", "1"), + KV.of("b", "13")); pipeline.run(); } @@ -500,13 +406,7 @@ public class CombineTest implements Serializable { @Category(ValidatesRunner.class) public void testSessionsCombineWithContext() { PCollection<KV<String, Integer>> perKeyInput = - pipeline.apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(4L)), - TimestampedValue.of(KV.of("a", 4), new Instant(7L)), - TimestampedValue.of(KV.of("b", 1), new Instant(10L)), - TimestampedValue.of(KV.of("b", 13), new Instant(16L))) + pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create()); @@ -527,23 +427,21 @@ public class CombineTest implements Serializable { .apply( Combine.<String, Integer, String>perKey( new TestCombineFnWithContext(globallyFixedWindowsView)) - .withSideInputs(globallyFixedWindowsView)); + .withSideInputs(Arrays.asList(globallyFixedWindowsView))); - PCollection<String> sessionsCombineGlobally = - globallyInput - .apply( - "Globally Input Sessions", - Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5)))) - .apply( - Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView)) - .withoutDefaults() - .withSideInputs(globallyFixedWindowsView)); + PCollection<String> sessionsCombineGlobally = globallyInput + .apply("Globally Input Sessions", + Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5)))) + .apply(Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView)) + .withoutDefaults() + .withSideInputs(Arrays.asList(globallyFixedWindowsView))); PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13); - PAssert.that(sessionsCombinePerKey) - .containsInAnyOrder( - Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13"))); - PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13"); + PAssert.that(sessionsCombinePerKey).containsInAnyOrder( + KV.of("a", "1114"), + KV.of("b", "11"), + KV.of("b", "013")); + PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114", "013"); pipeline.run(); } @@ -563,13 +461,7 @@ public class CombineTest implements Serializable { @Test @Category(ValidatesRunner.class) public void testAccumulatingCombine() { - runTestAccumulatingCombine(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0))); + runTestAccumulatingCombine(TABLE, 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0))); } @Test @@ -611,13 +503,7 @@ public class CombineTest implements Serializable { @Test @Category(ValidatesRunner.class) public void testHotKeyCombining() { - PCollection<KV<String, Integer>> input = copy(createInput(pipeline, Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - )), 10); + PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10); CombineFn<Integer, ?, Double> mean = new MeanInts(); PCollection<KV<String, Double>> coldMean = input.apply("ColdMean", @@ -674,13 +560,7 @@ public class CombineTest implements Serializable { @Test @Category(NeedsRunner.class) public void testBinaryCombineFn() { - PCollection<KV<String, Integer>> input = copy(createInput(pipeline, Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - )), 2); + PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 2); PCollection<KV<String, Integer>> intProduct = input .apply("IntProduct", Combine.<String, Integer, Integer>perKey(new TestProdInt())); PCollection<KV<String, Integer>> objProduct = input @@ -771,7 +651,7 @@ public class CombineTest implements Serializable { pipeline .apply( "CreateMainInput", - Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) + Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) .apply("WindowMainInput", Window.<Void>into(windowFn)) .apply( "OutputSideInput", @@ -996,13 +876,15 @@ public class CombineTest implements Serializable { */ private class CountSumCoder extends AtomicCoder<CountSum> { @Override - public void encode(CountSum value, OutputStream outStream) throws IOException { + public void encode(CountSum value, OutputStream outStream) + throws CoderException, IOException { LONG_CODER.encode(value.count, outStream); DOUBLE_CODER.encode(value.sum, outStream); } @Override - public CountSum decode(InputStream inStream) throws IOException { + public CountSum decode(InputStream inStream) + throws CoderException, IOException { long count = LONG_CODER.decode(inStream); double sum = DOUBLE_CODER.decode(inStream); return new CountSum(count, sum); @@ -1035,26 +917,34 @@ public class CombineTest implements Serializable { // Not serializable. static class Accumulator { - final String seed; String value; - public Accumulator(String seed, String value) { - this.seed = seed; + public Accumulator(String value) { this.value = value; } public static Coder<Accumulator> getCoder() { return new AtomicCoder<Accumulator>() { @Override - public void encode(Accumulator accumulator, OutputStream outStream) throws IOException { - StringUtf8Coder.of().encode(accumulator.seed, outStream); - StringUtf8Coder.of().encode(accumulator.value, outStream); + public void encode(Accumulator accumulator, OutputStream outStream) + throws CoderException, IOException { + encode(accumulator, outStream, Coder.Context.NESTED); } @Override - public Accumulator decode(InputStream inStream) throws IOException { - String seed = StringUtf8Coder.of().decode(inStream); - String value = StringUtf8Coder.of().decode(inStream); - return new Accumulator(seed, value); + public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context) + throws CoderException, IOException { + StringUtf8Coder.of().encode(accumulator.value, outStream, context); + } + + @Override + public Accumulator decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override + public Accumulator decode(InputStream inStream, Coder.Context context) + throws CoderException, IOException { + return new Accumulator(StringUtf8Coder.of().decode(inStream, context)); } }; } @@ -1068,13 +958,13 @@ public class CombineTest implements Serializable { @Override public Accumulator createAccumulator() { - return new Accumulator("", ""); + return new Accumulator(""); } @Override public Accumulator addInput(Accumulator accumulator, Integer value) { try { - return new Accumulator(accumulator.seed, accumulator.value + String.valueOf(value)); + return new Accumulator(accumulator.value + String.valueOf(value)); } finally { accumulator.value = "cleared in addInput"; } @@ -1082,22 +972,12 @@ public class CombineTest implements Serializable { @Override public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) { - Accumulator seedAccumulator = null; - StringBuilder all = new StringBuilder(); + String all = ""; for (Accumulator accumulator : accumulators) { - if (seedAccumulator == null) { - seedAccumulator = accumulator; - } else { - assertEquals( - String.format( - "Different seed values in accumulator: %s vs. %s", seedAccumulator, accumulator), - seedAccumulator.seed, - accumulator.seed); - } - all.append(accumulator.value); + all += accumulator.value; accumulator.value = "cleared in mergeAccumulators"; } - return new Accumulator(checkNotNull(seedAccumulator).seed, all.toString()); + return new Accumulator(all); } @Override @@ -1127,47 +1007,40 @@ public class CombineTest implements Serializable { @Override public TestCombineFn.Accumulator createAccumulator(Context c) { - Integer sideInputValue = c.sideInput(view); - return new TestCombineFn.Accumulator(sideInputValue.toString(), ""); + return new TestCombineFn.Accumulator(c.sideInput(view).toString()); } @Override public TestCombineFn.Accumulator addInput( TestCombineFn.Accumulator accumulator, Integer value, Context c) { try { - assertThat( - "Not expecting view contents to change", - accumulator.seed, - Matchers.equalTo(Integer.toString(c.sideInput(view)))); - return new TestCombineFn.Accumulator( - accumulator.seed, accumulator.value + String.valueOf(value)); + assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString())); + return new TestCombineFn.Accumulator(accumulator.value + String.valueOf(value)); } finally { accumulator.value = "cleared in addInput"; } + } @Override public TestCombineFn.Accumulator mergeAccumulators( Iterable<TestCombineFn.Accumulator> accumulators, Context c) { - String sideInputValue = c.sideInput(view).toString(); - StringBuilder all = new StringBuilder(); + String prefix = c.sideInput(view).toString(); + String all = prefix; for (TestCombineFn.Accumulator accumulator : accumulators) { - assertThat( - "Accumulators should all have the same Side Input Value", - accumulator.seed, - Matchers.equalTo(sideInputValue)); - all.append(accumulator.value); + assertThat(accumulator.value, Matchers.startsWith(prefix)); + all += accumulator.value.substring(prefix.length()); accumulator.value = "cleared in mergeAccumulators"; } - return new TestCombineFn.Accumulator(sideInputValue, all.toString()); + return new TestCombineFn.Accumulator(all); } @Override public String extractOutput(TestCombineFn.Accumulator accumulator, Context c) { - assertThat(accumulator.seed, Matchers.startsWith(c.sideInput(view).toString())); + assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString())); char[] chars = accumulator.value.toCharArray(); Arrays.sort(chars); - return accumulator.seed + ":" + new String(chars); + return new String(chars); } } @@ -1205,7 +1078,7 @@ public class CombineTest implements Serializable { @Override public void mergeAccumulator(Counter accumulator) { checkState(outputs == 0); - assertEquals(0, accumulator.outputs); + checkArgument(accumulator.outputs == 0); merges += accumulator.merges + 1; inputs += accumulator.inputs; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 5cb9e18..1bb71bb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -360,38 +360,6 @@ public class DoFnTesterTest { } } - @Test - public void testSupportsFinishBundleOutput() throws Exception { - for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) { - try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new BundleCounterDoFn())) { - tester.setCloningBehavior(cloning); - - assertThat(tester.processBundle(1, 2, 3, 4), contains(4)); - assertThat(tester.processBundle(5, 6, 7), contains(3)); - assertThat(tester.processBundle(8, 9), contains(2)); - } - } - } - - private static class BundleCounterDoFn extends DoFn<Integer, Integer> { - private int elements; - - @StartBundle - public void startBundle() { - elements = 0; - } - - @ProcessElement - public void processElement(ProcessContext c) { - elements++; - } - - @FinishBundle - public void finishBundle(FinishBundleContext c) { - c.output(elements, Instant.now(), GlobalWindow.INSTANCE); - } - } - private static class SideInputDoFn extends DoFn<Integer, Integer> { private final PCollectionView<Integer> value;
