Repository: beam
Updated Branches:
  refs/heads/master 1f2634d23 -> 540fa9b42


http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 60088de..1d4ce08 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -68,8 +68,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
@@ -178,11 +176,7 @@ public class WriteFilesTest {
             "Intimidating pigeon",
             "Pedantic gull",
             "Frisky finch");
-    runWrite(
-        inputs,
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), 
WriteFiles.to(makeSimpleSink()));
   }
 
   /** Test that WriteFiles with an empty input still produces one shard. */
@@ -193,7 +187,7 @@ public class WriteFilesTest {
         Collections.<String>emptyList(),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+        WriteFiles.to(makeSimpleSink()));
     checkFileContents(getBaseOutputFilename(), 
Collections.<String>emptyList(), Optional.of(1));
   }
 
@@ -208,7 +202,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()).withNumShards(1));
+        WriteFiles.to(makeSimpleSink()));
   }
 
   private ResourceId getBaseOutputDirectory() {
@@ -241,9 +235,7 @@ public class WriteFilesTest {
     }
 
     SimpleSink<Void> sink = makeSimpleSink();
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity())
-            .withSharding(new LargestInt());
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink).withSharding(new 
LargestInt());
     p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
         .apply(write);
@@ -264,8 +256,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity())
-            .withNumShards(20));
+        WriteFiles.to(makeSimpleSink()).withNumShards(20));
   }
 
   /** Test a WriteFiles transform with an empty PCollection. */
@@ -273,11 +264,7 @@ public class WriteFilesTest {
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws IOException {
     List<String> inputs = new ArrayList<>();
-    runWrite(
-        inputs,
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), 
WriteFiles.to(makeSimpleSink()));
   }
 
   /** Test a WriteFiles with a windowed PCollection. */
@@ -295,7 +282,7 @@ public class WriteFilesTest {
         inputs,
         new 
WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+        WriteFiles.to(makeSimpleSink()));
   }
 
   /** Test a WriteFiles with sessions. */
@@ -314,7 +301,7 @@ public class WriteFilesTest {
         inputs,
         new 
WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+        WriteFiles.to(makeSimpleSink()));
   }
 
   @Test
@@ -328,15 +315,12 @@ public class WriteFilesTest {
         inputs,
         Window.<String>into(FixedWindows.of(Duration.millis(2))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity())
-            .withMaxNumWritersPerBundle(2)
-            .withWindowedWrites());
+        
WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
   }
 
   public void testBuildWrite() {
     SimpleSink<Void> sink = makeSimpleSink();
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, 
SerializableFunctions.<String>identity()).withNumShards(3);
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3);
     assertThat((SimpleSink<Void>) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding 
=
         write.getSharding();
@@ -358,7 +342,7 @@ public class WriteFilesTest {
 
   @Test
   public void testDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -374,8 +358,7 @@ public class WriteFilesTest {
             builder.add(DisplayData.item("foo", "bar"));
           }
         };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity());
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink);
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -391,9 +374,7 @@ public class WriteFilesTest {
         "Must use windowed writes when applying WriteFiles to an unbounded 
PCollection");
 
     SimpleSink<Void> sink = makeSimpleSink();
-    p.apply(Create.of("foo"))
-        .setIsBoundedInternal(IsBounded.UNBOUNDED)
-        .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()));
+    
p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(WriteFiles.to(sink));
     p.run();
   }
 
@@ -408,13 +389,13 @@ public class WriteFilesTest {
     SimpleSink<Void> sink = makeSimpleSink();
     p.apply(Create.of("foo"))
         .setIsBoundedInternal(IsBounded.UNBOUNDED)
-        .apply(WriteFiles.to(sink, 
SerializableFunctions.<String>identity()).withWindowedWrites());
+        .apply(WriteFiles.to(sink).withWindowedWrites());
     p.run();
   }
 
   // Test DynamicDestinations class. Expects user values to be string-encoded 
integers.
   // Stores the integer mod 5 as the destination, and uses that in the file 
prefix.
-  static class TestDestinations extends DynamicDestinations<String, Integer> {
+  static class TestDestinations extends DynamicDestinations<String, Integer, 
String> {
     private ResourceId baseOutputDirectory;
 
     TestDestinations(ResourceId baseOutputDirectory) {
@@ -422,6 +403,11 @@ public class WriteFilesTest {
     }
 
     @Override
+    public String formatRecord(String record) {
+      return "record_" + record;
+    }
+
+    @Override
     public Integer getDestination(String element) {
       return Integer.valueOf(element) % 5;
     }
@@ -444,14 +430,6 @@ public class WriteFilesTest {
     }
   }
 
-  // Test format function. Prepend a string to each record before writing.
-  static class TestDynamicFormatFunction implements 
SerializableFunction<String, String> {
-    @Override
-    public String apply(String input) {
-      return "record_" + input;
-    }
-  }
-
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsBounded() throws Exception {
@@ -495,8 +473,7 @@ public class WriteFilesTest {
     // If emptyShards==true make numShards larger than the number of elements 
per destination.
     // This will force every destination to generate some empty shards.
     int numShards = emptyShards ? 2 * numInputs / 5 : 2;
-    WriteFiles<String, Integer, String> writeFiles =
-        WriteFiles.to(sink, new 
TestDynamicFormatFunction()).withNumShards(numShards);
+    WriteFiles<String, Integer, String> writeFiles = 
WriteFiles.to(sink).withNumShards(numShards);
 
     PCollection<String> input = p.apply(Create.timestamped(inputs, 
timestamps));
     if (!bounded) {
@@ -521,7 +498,7 @@ public class WriteFilesTest {
 
   @Test
   public void testShardedDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -537,8 +514,7 @@ public class WriteFilesTest {
             builder.add(DisplayData.item("foo", "bar"));
           }
         };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, 
SerializableFunctions.<String>identity()).withNumShards(1);
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -547,7 +523,7 @@ public class WriteFilesTest {
 
   @Test
   public void testCustomShardStrategyDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -564,7 +540,7 @@ public class WriteFilesTest {
           }
         };
     WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity())
+        WriteFiles.to(sink)
             .withSharding(
                 new PTransform<PCollection<String>, 
PCollectionView<Integer>>() {
                   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 442fba5..7255a94 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -522,8 +521,7 @@ public class XmlIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      return input.apply(
-          org.apache.beam.sdk.io.WriteFiles.to(createSink(), 
SerializableFunctions.<T>identity()));
+      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 74e0bda..b663544 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T, Void> {
+class XmlSink<T> extends FileBasedSink<T, Void, T> {
   private static final String XML_EXTENSION = ".xml";
 
   private final XmlIO.Write<T> spec;
@@ -46,7 +46,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
   }
 
   XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), 
DynamicFileDestinations.constant(makeFilenamePolicy(spec)));
+    super(spec.getFilenamePrefix(), 
DynamicFileDestinations.<T>constant(makeFilenamePolicy(spec)));
     this.spec = spec;
   }
 
@@ -77,7 +77,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
   }
 
   /** {@link WriteOperation} for XML {@link FileBasedSink}s. */
-  protected static final class XmlWriteOperation<T> extends WriteOperation<T, 
Void> {
+  protected static final class XmlWriteOperation<T> extends 
WriteOperation<Void, T> {
     public XmlWriteOperation(XmlSink<T> sink) {
       super(sink);
     }
@@ -112,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
   }
 
   /** A {@link Writer} that can write objects as XML elements. */
-  protected static final class XmlWriter<T> extends Writer<T, Void> {
+  protected static final class XmlWriter<T> extends Writer<Void, T> {
     final Marshaller marshaller;
     private OutputStream os = null;
 

Reply via email to