Repository: beam Updated Branches: refs/heads/master 1f2634d23 -> 540fa9b42
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 60088de..1d4ce08 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -68,8 +68,6 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.View; @@ -178,11 +176,7 @@ public class WriteFilesTest { "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite( - inputs, - IDENTITY_MAP, - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); } /** Test that WriteFiles with an empty input still produces one shard. */ @@ -193,7 +187,7 @@ public class WriteFilesTest { Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + WriteFiles.to(makeSimpleSink())); checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1)); } @@ -208,7 +202,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()).withNumShards(1)); + WriteFiles.to(makeSimpleSink())); } private ResourceId getBaseOutputDirectory() { @@ -241,9 +235,7 @@ public class WriteFilesTest { } SimpleSink<Void> sink = makeSimpleSink(); - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()) - .withSharding(new LargestInt()); + WriteFiles<String, ?, String> write = WriteFiles.to(sink).withSharding(new LargestInt()); p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(IDENTITY_MAP) .apply(write); @@ -264,8 +256,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()) - .withNumShards(20)); + WriteFiles.to(makeSimpleSink()).withNumShards(20)); } /** Test a WriteFiles transform with an empty PCollection. */ @@ -273,11 +264,7 @@ public class WriteFilesTest { @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws IOException { List<String> inputs = new ArrayList<>(); - runWrite( - inputs, - IDENTITY_MAP, - getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); } /** Test a WriteFiles with a windowed PCollection. */ @@ -295,7 +282,7 @@ public class WriteFilesTest { inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + WriteFiles.to(makeSimpleSink())); } /** Test a WriteFiles with sessions. */ @@ -314,7 +301,7 @@ public class WriteFilesTest { inputs, new WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); + WriteFiles.to(makeSimpleSink())); } @Test @@ -328,15 +315,12 @@ public class WriteFilesTest { inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))), getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()) - .withMaxNumWritersPerBundle(2) - .withWindowedWrites()); + WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites()); } public void testBuildWrite() { SimpleSink<Void> sink = makeSimpleSink(); - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(3); + WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3); assertThat((SimpleSink<Void>) write.getSink(), is(sink)); PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = write.getSharding(); @@ -358,7 +342,7 @@ public class WriteFilesTest { @Test public void testDisplayData() { - DynamicDestinations<String, Void> dynamicDestinations = + DynamicDestinations<String, Void, String> dynamicDestinations = DynamicFileDestinations.constant( DefaultFilenamePolicy.fromParams( new Params() @@ -374,8 +358,7 @@ public class WriteFilesTest { builder.add(DisplayData.item("foo", "bar")); } }; - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()); + WriteFiles<String, ?, String> write = WriteFiles.to(sink); DisplayData displayData = DisplayData.from(write); @@ -391,9 +374,7 @@ public class WriteFilesTest { "Must use windowed writes when applying WriteFiles to an unbounded PCollection"); SimpleSink<Void> sink = makeSimpleSink(); - p.apply(Create.of("foo")) - .setIsBoundedInternal(IsBounded.UNBOUNDED) - .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity())); + p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(WriteFiles.to(sink)); p.run(); } @@ -408,13 +389,13 @@ public class WriteFilesTest { SimpleSink<Void> sink = makeSimpleSink(); p.apply(Create.of("foo")) .setIsBoundedInternal(IsBounded.UNBOUNDED) - .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()).withWindowedWrites()); + .apply(WriteFiles.to(sink).withWindowedWrites()); p.run(); } // Test DynamicDestinations class. Expects user values to be string-encoded integers. // Stores the integer mod 5 as the destination, and uses that in the file prefix. - static class TestDestinations extends DynamicDestinations<String, Integer> { + static class TestDestinations extends DynamicDestinations<String, Integer, String> { private ResourceId baseOutputDirectory; TestDestinations(ResourceId baseOutputDirectory) { @@ -422,6 +403,11 @@ public class WriteFilesTest { } @Override + public String formatRecord(String record) { + return "record_" + record; + } + + @Override public Integer getDestination(String element) { return Integer.valueOf(element) % 5; } @@ -444,14 +430,6 @@ public class WriteFilesTest { } } - // Test format function. Prepend a string to each record before writing. - static class TestDynamicFormatFunction implements SerializableFunction<String, String> { - @Override - public String apply(String input) { - return "record_" + input; - } - } - @Test @Category(NeedsRunner.class) public void testDynamicDestinationsBounded() throws Exception { @@ -495,8 +473,7 @@ public class WriteFilesTest { // If emptyShards==true make numShards larger than the number of elements per destination. // This will force every destination to generate some empty shards. int numShards = emptyShards ? 2 * numInputs / 5 : 2; - WriteFiles<String, Integer, String> writeFiles = - WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(numShards); + WriteFiles<String, Integer, String> writeFiles = WriteFiles.to(sink).withNumShards(numShards); PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps)); if (!bounded) { @@ -521,7 +498,7 @@ public class WriteFilesTest { @Test public void testShardedDisplayData() { - DynamicDestinations<String, Void> dynamicDestinations = + DynamicDestinations<String, Void, String> dynamicDestinations = DynamicFileDestinations.constant( DefaultFilenamePolicy.fromParams( new Params() @@ -537,8 +514,7 @@ public class WriteFilesTest { builder.add(DisplayData.item("foo", "bar")); } }; - WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(1); + WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(1); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); assertThat(displayData, includesDisplayDataFor("sink", sink)); @@ -547,7 +523,7 @@ public class WriteFilesTest { @Test public void testCustomShardStrategyDisplayData() { - DynamicDestinations<String, Void> dynamicDestinations = + DynamicDestinations<String, Void, String> dynamicDestinations = DynamicFileDestinations.constant( DefaultFilenamePolicy.fromParams( new Params() @@ -564,7 +540,7 @@ public class WriteFilesTest { } }; WriteFiles<String, ?, String> write = - WriteFiles.to(sink, SerializableFunctions.<String>identity()) + WriteFiles.to(sink) .withSharding( new PTransform<PCollection<String>, PCollectionView<Integer>>() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 442fba5..7255a94 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -522,8 +521,7 @@ public class XmlIO { @Override public PDone expand(PCollection<T> input) { - return input.apply( - org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity())); + return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 74e0bda..b663544 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; /** Implementation of {@link XmlIO#write}. */ -class XmlSink<T> extends FileBasedSink<T, Void> { +class XmlSink<T> extends FileBasedSink<T, Void, T> { private static final String XML_EXTENSION = ".xml"; private final XmlIO.Write<T> spec; @@ -46,7 +46,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> { } XmlSink(XmlIO.Write<T> spec) { - super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec))); + super(spec.getFilenamePrefix(), DynamicFileDestinations.<T>constant(makeFilenamePolicy(spec))); this.spec = spec; } @@ -77,7 +77,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> { } /** {@link WriteOperation} for XML {@link FileBasedSink}s. */ - protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> { + protected static final class XmlWriteOperation<T> extends WriteOperation<Void, T> { public XmlWriteOperation(XmlSink<T> sink) { super(sink); } @@ -112,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> { } /** A {@link Writer} that can write objects as XML elements. */ - protected static final class XmlWriter<T> extends Writer<T, Void> { + protected static final class XmlWriter<T> extends Writer<Void, T> { final Marshaller marshaller; private OutputStream os = null;
