http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index a6be4fb..9468893 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -42,9 +42,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -71,31 +69,22 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -120,10 +109,10 @@ import org.junit.runners.JUnit4;
 public class TextIOTest {
   private static final String MY_HEADER = "myHeader";
   private static final String MY_FOOTER = "myFooter";
-  private static final List<String> EMPTY = Collections.emptyList();
-  private static final List<String> TINY =
-      Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk");
-  private static final List<String> LARGE = makeLines(1000);
+  private static final String[] EMPTY = new String[] {};
+  private static final String[] TINY =
+      new String[] {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+  private static final String[] LARGE = makeLines(1000);
 
   private static Path tempFolder;
   private static File emptyTxt;
@@ -148,7 +137,7 @@ public class TextIOTest {
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  private static File writeToFile(List<String> lines, String filename, 
CompressionType compression)
+  private static File writeToFile(String[] lines, String filename, 
CompressionType compression)
       throws IOException {
     File file = tempFolder.resolve(filename).toFile();
     OutputStream output = new FileOutputStream(file);
@@ -216,7 +205,7 @@ public class TextIOTest {
     });
   }
 
-  private void runTestRead(String[] expected) throws Exception {
+  private <T> void runTestRead(String[] expected) throws Exception {
     File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
     String filename = tmpFile.getPath();
 
@@ -285,213 +274,6 @@ public class TextIOTest {
         displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
   }
 
-  static class TestDynamicDestinations extends DynamicDestinations<String, 
String> {
-    ResourceId baseDir;
-
-    TestDynamicDestinations(ResourceId baseDir) {
-      this.baseDir = baseDir;
-    }
-
-    @Override
-    public String getDestination(String element) {
-      // Destination is based on first character of string.
-      return element.substring(0, 1);
-    }
-
-    @Override
-    public String getDefaultDestination() {
-      return "";
-    }
-
-    @Nullable
-    @Override
-    public Coder<String> getDestinationCoder() {
-      return StringUtf8Coder.of();
-    }
-
-    @Override
-    public FilenamePolicy getFilenamePolicy(String destination) {
-      return DefaultFilenamePolicy.fromStandardParameters(
-          StaticValueProvider.of(
-              baseDir.resolve("file_" + destination + ".txt", 
StandardResolveOptions.RESOLVE_FILE)),
-          null,
-          null,
-          false);
-    }
-  }
-
-  class StartsWith implements Predicate<String> {
-    String prefix;
-
-    StartsWith(String prefix) {
-      this.prefix = prefix;
-    }
-
-    @Override
-    public boolean apply(@Nullable String input) {
-      return input.startsWith(prefix);
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinations() throws Exception {
-    ResourceId baseDir =
-        FileSystems.matchNewResource(
-            Files.createTempDirectory(tempFolder, 
"testDynamicDestinations").toString(), true);
-
-    List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", 
"caaa", "caab");
-    PCollection<String> input = 
p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
-    input.apply(
-        TextIO.write()
-            .to(new TestDynamicDestinations(baseDir))
-            
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
-    p.run();
-
-    assertOutputFiles(
-        Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), 
String.class),
-        null,
-        null,
-        0,
-        baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE),
-        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
-    assertOutputFiles(
-        Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), 
String.class),
-        null,
-        null,
-        0,
-        baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE),
-        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
-    assertOutputFiles(
-        Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), 
String.class),
-        null,
-        null,
-        0,
-        baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE),
-        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  private static class UserWriteType {
-    String destination;
-    String metadata;
-
-    UserWriteType() {
-      this.destination = "";
-      this.metadata = "";
-    }
-
-    UserWriteType(String destination, String metadata) {
-      this.destination = destination;
-      this.metadata = metadata;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("destination: %s metadata : %s", destination, 
metadata);
-    }
-  }
-
-  private static class SerializeUserWrite implements 
SerializableFunction<UserWriteType, String> {
-    @Override
-    public String apply(UserWriteType input) {
-      return input.toString();
-    }
-  }
-
-  private static class UserWriteDestination implements 
SerializableFunction<UserWriteType, Params> {
-    private ResourceId baseDir;
-
-    UserWriteDestination(ResourceId baseDir) {
-      this.baseDir = baseDir;
-    }
-
-    @Override
-    public Params apply(UserWriteType input) {
-      return new Params()
-          .withBaseFilename(
-              baseDir.resolve(
-                  "file_" + input.destination.substring(0, 1) + ".txt",
-                  StandardResolveOptions.RESOLVE_FILE));
-    }
-  }
-
-  private static class ExtractWriteDestination implements 
Function<UserWriteType, String> {
-    @Override
-    public String apply(@Nullable UserWriteType input) {
-      return input.destination;
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDefaultFilenamePolicy() throws Exception {
-    ResourceId baseDir =
-        FileSystems.matchNewResource(
-            Files.createTempDirectory(tempFolder, 
"testDynamicDestinations").toString(), true);
-
-    List<UserWriteType> elements =
-        Lists.newArrayList(
-            new UserWriteType("aaaa", "first"),
-            new UserWriteType("aaab", "second"),
-            new UserWriteType("baaa", "third"),
-            new UserWriteType("baab", "fourth"),
-            new UserWriteType("caaa", "fifth"),
-            new UserWriteType("caab", "sixth"));
-    PCollection<UserWriteType> input = p.apply(Create.of(elements));
-    input.apply(
-        TextIO.writeCustomType(new SerializeUserWrite())
-            .to(new UserWriteDestination(baseDir), new Params())
-            
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
-    p.run();
-
-    String[] aElements =
-        Iterables.toArray(
-            Iterables.transform(
-                Iterables.filter(
-                    elements,
-                    Predicates.compose(new StartsWith("a"), new 
ExtractWriteDestination())),
-                Functions.toStringFunction()),
-            String.class);
-    String[] bElements =
-        Iterables.toArray(
-            Iterables.transform(
-                Iterables.filter(
-                    elements,
-                    Predicates.compose(new StartsWith("b"), new 
ExtractWriteDestination())),
-                Functions.toStringFunction()),
-            String.class);
-    String[] cElements =
-        Iterables.toArray(
-            Iterables.transform(
-                Iterables.filter(
-                    elements,
-                    Predicates.compose(new StartsWith("c"), new 
ExtractWriteDestination())),
-                Functions.toStringFunction()),
-            String.class);
-    assertOutputFiles(
-        aElements,
-        null,
-        null,
-        0,
-        baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE),
-        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
-    assertOutputFiles(
-        bElements,
-        null,
-        null,
-        0,
-        baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE),
-        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
-    assertOutputFiles(
-        cElements,
-        null,
-        null,
-        0,
-        baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE),
-        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
-  }
-
   private void runTestWrite(String[] elems) throws Exception {
     runTestWrite(elems, null, null, 1);
   }
@@ -509,8 +291,7 @@ public class TextIOTest {
       String[] elems, String header, String footer, int numShards) throws 
Exception {
     String outputName = "file.txt";
     Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
-    ResourceId baseFilename =
-        
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
+    String baseFilename = baseDir.resolve(outputName).toString();
 
     PCollection<String> input =
         
p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
@@ -530,14 +311,8 @@ public class TextIOTest {
 
     p.run();
 
-    assertOutputFiles(
-        elems,
-        header,
-        footer,
-        numShards,
-        baseFilename,
-        firstNonNull(
-            write.inner.getShardTemplate(),
+    assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
+        firstNonNull(write.getShardTemplate(),
             DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
   }
 
@@ -546,12 +321,13 @@ public class TextIOTest {
       final String header,
       final String footer,
       int numShards,
-      ResourceId outputPrefix,
+      Path rootLocation,
+      String outputName,
       String shardNameTemplate)
       throws Exception {
     List<File> expectedFiles = new ArrayList<>();
     if (numShards == 0) {
-      String pattern = outputPrefix.toString() + "*";
+      String pattern = rootLocation.toAbsolutePath().resolve(outputName + 
"*").toString();
       List<MatchResult> matches = 
FileSystems.match(Collections.singletonList(pattern));
       for (Metadata expectedFile : 
Iterables.getOnlyElement(matches).metadata()) {
         expectedFiles.add(new File(expectedFile.resourceId().toString()));
@@ -560,9 +336,9 @@ public class TextIOTest {
       for (int i = 0; i < numShards; i++) {
         expectedFiles.add(
             new File(
+                rootLocation.toString(),
                 DefaultFilenamePolicy.constructName(
-                        outputPrefix, shardNameTemplate, "", i, numShards, 
null, null)
-                    .toString()));
+                    outputName, shardNameTemplate, "", i, numShards, null, 
null)));
       }
     }
 
@@ -680,19 +456,14 @@ public class TextIOTest {
   public void testWriteWithWritableByteChannelFactory() throws Exception {
     Coder<String> coder = StringUtf8Coder.of();
     String outputName = "file.txt";
-    ResourceId baseDir =
-        FileSystems.matchNewResource(
-            Files.createTempDirectory(tempFolder, "testwrite").toString(), 
true);
+    Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
 
     PCollection<String> input = 
p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
 
     final WritableByteChannelFactory writableByteChannelFactory =
         new DrunkWritableByteChannelFactory();
-    TextIO.Write write =
-        TextIO.write()
-            .to(baseDir.resolve(outputName, 
StandardResolveOptions.RESOLVE_FILE).toString())
-            .withoutSharding()
-            .withWritableByteChannelFactory(writableByteChannelFactory);
+    TextIO.Write write = 
TextIO.write().to(baseDir.resolve(outputName).toString())
+        
.withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("writableByteChannelFactory", 
"DRUNK"));
 
@@ -705,15 +476,8 @@ public class TextIOTest {
       drunkElems.add(elem);
       drunkElems.add(elem);
     }
-    assertOutputFiles(
-        drunkElems.toArray(new String[0]),
-        null,
-        null,
-        1,
-        baseDir.resolve(
-            outputName + 
writableByteChannelFactory.getSuggestedFilenameSuffix(),
-            StandardResolveOptions.RESOLVE_FILE),
-        write.inner.getShardTemplate());
+    assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, 
baseDir,
+        outputName + writableByteChannelFactory.getFilenameSuffix(), 
write.getShardTemplate());
   }
 
   @Test
@@ -791,7 +555,7 @@ public class TextIOTest {
    * Helper that writes the given lines (adding a newline in between) to a 
stream, then closes the
    * stream.
    */
-  private static void writeToStreamAndClose(List<String> lines, OutputStream 
outputStream) {
+  private static void writeToStreamAndClose(String[] lines, OutputStream 
outputStream) {
     try (PrintStream writer = new PrintStream(outputStream)) {
       for (String line : lines) {
         writer.println(line);
@@ -800,33 +564,27 @@ public class TextIOTest {
   }
 
   /**
-   * Helper method that runs 
TextIO.read().from(filename).withCompressionType(compressionType) and
-   * TextIO.readAll().withCompressionType(compressionType) applied to the 
single filename,
+   * Helper method that runs 
TextIO.read().from(filename).withCompressionType(compressionType)
    * and asserts that the results match the given expected output.
    */
   private void assertReadingCompressedFileMatchesExpected(
-      File file, CompressionType compressionType, List<String> expected) {
-
-    TextIO.Read read = 
TextIO.read().from(file.getPath()).withCompressionType(compressionType);
-    PAssert.that(p.apply("Read_" + file + "_" + compressionType.toString(), 
read))
-        .containsInAnyOrder(expected);
-
-    TextIO.ReadAll readAll =
-        
TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10);
-    PAssert.that(
-            p.apply("Create_" + file, Create.of(file.getPath()))
-                .apply("Read_" + compressionType.toString(), readAll))
-        .containsInAnyOrder(expected);
+      File file, CompressionType compressionType, String[] expected) {
+
+    TextIO.Read read =
+        
TextIO.read().from(file.getPath()).withCompressionType(compressionType);
+    PCollection<String> output = p.apply("Read_" + file + "_" + 
compressionType.toString(), read);
+
+    PAssert.that(output).containsInAnyOrder(expected);
     p.run();
   }
 
   /**
    * Helper to make an array of compressible strings. Returns ["word"i] for i 
in range(0,n).
    */
-  private static List<String> makeLines(int n) {
-    List<String> ret = new ArrayList<>();
+  private static String[] makeLines(int n) {
+    String[] ret = new String[n];
     for (int i = 0; i < n; ++i) {
-      ret.add("word" + i);
+      ret[i] = "word" + i;
     }
     return ret;
   }
@@ -1010,7 +768,7 @@ public class TextIOTest {
 
     String filename = createZipFile(expected, "multiple entries", entry0, 
entry1, entry2);
     assertReadingCompressedFileMatchesExpected(
-        new File(filename), CompressionType.ZIP, expected);
+        new File(filename), CompressionType.ZIP, expected.toArray(new 
String[]{}));
   }
 
   /**
@@ -1029,7 +787,7 @@ public class TextIOTest {
         new String[]{"dog"});
 
     assertReadingCompressedFileMatchesExpected(
-        new File(filename), CompressionType.ZIP, Arrays.asList("cat", "dog"));
+        new File(filename), CompressionType.ZIP, new String[] {"cat", "dog"});
   }
 
   @Test
@@ -1346,21 +1104,5 @@ public class TextIOTest {
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
 
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadAll() throws IOException {
-    writeToFile(TINY, "readAllTiny1.zip", ZIP);
-    writeToFile(TINY, "readAllTiny2.zip", ZIP);
-    writeToFile(LARGE, "readAllLarge1.zip", ZIP);
-    writeToFile(LARGE, "readAllLarge2.zip", ZIP);
-    PCollection<String> lines =
-        p.apply(
-                Create.of(
-                    tempFolder.resolve("readAllTiny*").toString(),
-                    tempFolder.resolve("readAllLarge*").toString()))
-            .apply(TextIO.readAll().withCompressionType(AUTO));
-    PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, 
LARGE));
-    p.run();
-  }
 }
+

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 55f2a87..a5dacd1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -42,11 +41,6 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -63,24 +57,17 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Duration;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -173,11 +160,7 @@ public class WriteFilesTest {
   public void testWrite() throws IOException {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(
-        inputs,
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
   }
 
   /**
@@ -186,11 +169,7 @@ public class WriteFilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptyWrite() throws IOException {
-    runWrite(
-        Collections.<String>emptyList(),
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+    runWrite(Collections.<String>emptyList(), IDENTITY_MAP, 
getBaseOutputFilename());
     checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
         Optional.of(1));
   }
@@ -206,7 +185,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()).withNumShards(1));
+        Optional.of(1));
   }
 
   private ResourceId getBaseOutputDirectory() {
@@ -214,13 +193,8 @@ public class WriteFilesTest {
         .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
 
   }
-
-  private SimpleSink<Void> makeSimpleSink() {
-    FilenamePolicy filenamePolicy =
-        new PerWindowFiles(
-            getBaseOutputDirectory().resolve("file", 
StandardResolveOptions.RESOLVE_FILE),
-            "simple");
-    return SimpleSink.makeSimpleSink(getBaseOutputDirectory(), filenamePolicy);
+  private SimpleSink makeSimpleSink() {
+    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", 
"simple");
   }
 
   @Test
@@ -239,10 +213,8 @@ public class WriteFilesTest {
       timestamps.add(i + 1);
     }
 
-    SimpleSink<Void> sink = makeSimpleSink();
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity())
-            .withSharding(new LargestInt());
+    SimpleSink sink = makeSimpleSink();
+    WriteFiles<String> write = WriteFiles.to(sink).withSharding(new 
LargestInt());
     p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
         .apply(write);
@@ -263,8 +235,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity())
-            .withNumShards(20));
+        Optional.of(20));
   }
 
   /**
@@ -274,11 +245,7 @@ public class WriteFilesTest {
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws IOException {
     List<String> inputs = new ArrayList<>();
-    runWrite(
-        inputs,
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
   }
 
   /**
@@ -290,10 +257,8 @@ public class WriteFilesTest {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
     runWrite(
-        inputs,
-        new 
WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+        inputs, new 
WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
+        getBaseOutputFilename());
   }
 
   /**
@@ -307,32 +272,16 @@ public class WriteFilesTest {
 
     runWrite(
         inputs,
-        new 
WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity()));
+        new WindowAndReshuffle<>(
+            Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
+        getBaseOutputFilename());
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testWriteSpilling() throws IOException {
-    List<String> inputs = Lists.newArrayList();
-    for (int i = 0; i < 100; ++i) {
-      inputs.add("mambo_number_" + i);
-    }
-    runWrite(
-        inputs,
-        Window.<String>into(FixedWindows.of(Duration.millis(2))),
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), 
SerializableFunctions.<String>identity())
-            .withMaxNumWritersPerBundle(2)
-            .withWindowedWrites());
-  }
-
   public void testBuildWrite() {
-    SimpleSink<Void> sink = makeSimpleSink();
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, 
SerializableFunctions.<String>identity()).withNumShards(3);
-    assertThat((SimpleSink<Void>) write.getSink(), is(sink));
+    SimpleSink sink = makeSimpleSink();
+    WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
+    assertThat((SimpleSink) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding 
=
         write.getSharding();
 
@@ -341,37 +290,25 @@ public class WriteFilesTest {
     assertThat(write.getNumShards().get(), equalTo(3));
     assertThat(write.getSharding(), equalTo(originalSharding));
 
-    WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM);
-    assertThat((SimpleSink<Void>) write2.getSink(), is(sink));
+    WriteFiles<String> write2 = write.withSharding(SHARDING_TRANSFORM);
+    assertThat((SimpleSink) write2.getSink(), is(sink));
     assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
     // original unchanged
 
-    WriteFiles<String, ?, ?> writeUnsharded = 
write2.withRunnerDeterminedSharding();
+    WriteFiles<String> writeUnsharded = write2.withRunnerDeterminedSharding();
     assertThat(writeUnsharded.getSharding(), nullValue());
     assertThat(write.getSharding(), equalTo(originalSharding));
   }
 
   @Test
   public void testDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
-        DynamicFileDestinations.constant(
-            DefaultFilenamePolicy.fromParams(
-                new Params()
-                    .withBaseFilename(
-                        getBaseOutputDirectory()
-                            .resolve("file", 
StandardResolveOptions.RESOLVE_FILE))
-                    .withShardTemplate("-SS-of-NN")));
-    SimpleSink<Void> sink =
-        new SimpleSink<Void>(
-            getBaseOutputDirectory(), dynamicDestinations, 
CompressionType.UNCOMPRESSED) {
-          @Override
-          public void populateDisplayData(DisplayData.Builder builder) {
-            builder.add(DisplayData.item("foo", "bar"));
-          }
-        };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity());
-
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", 
"-SS-of-NN", "") {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    WriteFiles<String> write = WriteFiles.to(sink);
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
@@ -379,145 +316,14 @@ public class WriteFilesTest {
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testUnboundedNeedsWindowed() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Must use windowed writes when applying WriteFiles to an unbounded 
PCollection");
-
-    SimpleSink<Void> sink = makeSimpleSink();
-    p.apply(Create.of("foo"))
-        .setIsBoundedInternal(IsBounded.UNBOUNDED)
-        .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()));
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testUnboundedNeedsSharding() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "When applying WriteFiles to an unbounded PCollection, "
-            + "must specify number of output shards explicitly");
-
-    SimpleSink<Void> sink = makeSimpleSink();
-    p.apply(Create.of("foo"))
-        .setIsBoundedInternal(IsBounded.UNBOUNDED)
-        .apply(WriteFiles.to(sink, 
SerializableFunctions.<String>identity()).withWindowedWrites());
-    p.run();
-  }
-
-  // Test DynamicDestinations class. Expects user values to be string-encoded 
integers.
-  // Stores the integer mod 5 as the destination, and uses that in the file 
prefix.
-  static class TestDestinations extends DynamicDestinations<String, Integer> {
-    private ResourceId baseOutputDirectory;
-
-    TestDestinations(ResourceId baseOutputDirectory) {
-      this.baseOutputDirectory = baseOutputDirectory;
-    }
-
-    @Override
-    public Integer getDestination(String element) {
-      return Integer.valueOf(element) % 5;
-    }
-
-    @Override
-    public Integer getDefaultDestination() {
-      return 0;
-    }
-
-    @Override
-    public FilenamePolicy getFilenamePolicy(Integer destination) {
-      return new PerWindowFiles(
-          baseOutputDirectory.resolve("file_" + destination, 
StandardResolveOptions.RESOLVE_FILE),
-          "simple");
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      super.populateDisplayData(builder);
-    }
-  }
-
-  // Test format function. Prepend a string to each record before writing.
-  static class TestDynamicFormatFunction implements 
SerializableFunction<String, String> {
-    @Override
-    public String apply(String input) {
-      return "record_" + input;
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsBounded() throws Exception {
-    testDynamicDestinationsHelper(true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsUnbounded() throws Exception {
-    testDynamicDestinationsHelper(false);
-  }
-
-  private void testDynamicDestinationsHelper(boolean bounded) throws 
IOException {
-    TestDestinations dynamicDestinations = new 
TestDestinations(getBaseOutputDirectory());
-    SimpleSink<Integer> sink =
-        new SimpleSink<>(
-            getBaseOutputDirectory(), dynamicDestinations, 
CompressionType.UNCOMPRESSED);
-
-    // Flag to validate that the pipeline options are passed to the Sink.
-    WriteOptions options = 
TestPipeline.testingPipelineOptions().as(WriteOptions.class);
-    options.setTestFlag("test_value");
-    Pipeline p = TestPipeline.create(options);
-
-    List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", 
"6", "7", "8", "9");
-    // Prepare timestamps for the elements.
-    List<Long> timestamps = new ArrayList<>();
-    for (long i = 0; i < inputs.size(); i++) {
-      timestamps.add(i + 1);
-    }
-
-    WriteFiles<String, Integer, String> writeFiles =
-        WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1);
-
-    PCollection<String> input = p.apply(Create.timestamped(inputs, 
timestamps));
-    if (!bounded) {
-      input.setIsBoundedInternal(IsBounded.UNBOUNDED);
-      input = 
input.apply(Window.<String>into(FixedWindows.of(Duration.standardDays(1))));
-      input.apply(writeFiles.withWindowedWrites());
-    } else {
-      input.apply(writeFiles);
-    }
-    p.run();
-
-    for (int i = 0; i < 5; ++i) {
-      ResourceId base =
-          getBaseOutputDirectory().resolve("file_" + i, 
StandardResolveOptions.RESOLVE_FILE);
-      List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i 
+ 5));
-      checkFileContents(base.toString(), expected, Optional.of(1));
-    }
-  }
-
-  @Test
   public void testShardedDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
-        DynamicFileDestinations.constant(
-            DefaultFilenamePolicy.fromParams(
-                new Params()
-                    .withBaseFilename(
-                        getBaseOutputDirectory()
-                            .resolve("file", 
StandardResolveOptions.RESOLVE_FILE))
-                    .withShardTemplate("-SS-of-NN")));
-    SimpleSink<Void> sink =
-        new SimpleSink<Void>(
-            getBaseOutputDirectory(), dynamicDestinations, 
CompressionType.UNCOMPRESSED) {
-          @Override
-          public void populateDisplayData(DisplayData.Builder builder) {
-            builder.add(DisplayData.item("foo", "bar"));
-          }
-        };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, 
SerializableFunctions.<String>identity()).withNumShards(1);
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", 
"-SS-of-NN", "") {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    WriteFiles<String> write = WriteFiles.to(sink).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -526,24 +332,14 @@ public class WriteFilesTest {
 
   @Test
   public void testCustomShardStrategyDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
-        DynamicFileDestinations.constant(
-            DefaultFilenamePolicy.fromParams(
-                new Params()
-                    .withBaseFilename(
-                        getBaseOutputDirectory()
-                            .resolve("file", 
StandardResolveOptions.RESOLVE_FILE))
-                    .withShardTemplate("-SS-of-NN")));
-    SimpleSink<Void> sink =
-        new SimpleSink<Void>(
-            getBaseOutputDirectory(), dynamicDestinations, 
CompressionType.UNCOMPRESSED) {
-          @Override
-          public void populateDisplayData(DisplayData.Builder builder) {
-            builder.add(DisplayData.item("foo", "bar"));
-          }
-        };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity())
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", 
"-SS-of-NN", "") {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    WriteFiles<String> write =
+        WriteFiles.to(sink)
             .withSharding(
                 new PTransform<PCollection<String>, 
PCollectionView<Integer>>() {
                   @Override
@@ -568,77 +364,22 @@ public class WriteFilesTest {
    * PCollection are written to the sink.
    */
   private void runWrite(
-      List<String> inputs,
-      PTransform<PCollection<String>, PCollection<String>> transform,
-      String baseName,
-      WriteFiles<String, ?, String> write)
-      throws IOException {
-    runShardedWrite(inputs, transform, baseName, write);
-  }
-
-  private static class PerWindowFiles extends FilenamePolicy {
-    private static final DateTimeFormatter FORMATTER = 
ISODateTimeFormat.hourMinuteSecondMillis();
-    private final ResourceId baseFilename;
-    private final String suffix;
-
-    public PerWindowFiles(ResourceId baseFilename, String suffix) {
-      this.baseFilename = baseFilename;
-      this.suffix = suffix;
-    }
-
-    public String filenamePrefixForWindow(IntervalWindow window) {
-      String prefix =
-          baseFilename.isDirectory() ? "" : 
firstNonNull(baseFilename.getFilename(), "");
-      return String.format("%s%s-%s",
-          prefix, FORMATTER.print(window.start()), 
FORMATTER.print(window.end()));
-    }
-
-    @Override
-    public ResourceId windowedFilename(WindowedContext context, 
OutputFileHints outputFileHints) {
-      IntervalWindow window = (IntervalWindow) context.getWindow();
-      String filename =
-          String.format(
-              "%s-%s-of-%s%s%s",
-              filenamePrefixForWindow(window),
-              context.getShardNumber(),
-              context.getNumShards(),
-              outputFileHints.getSuggestedFilenameSuffix(),
-              suffix);
-      return baseFilename
-          .getCurrentDirectory()
-          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
-    }
-
-    @Override
-    public ResourceId unwindowedFilename(Context context, OutputFileHints 
outputFileHints) {
-      String prefix =
-          baseFilename.isDirectory() ? "" : 
firstNonNull(baseFilename.getFilename(), "");
-      String filename =
-          String.format(
-              "%s-%s-of-%s%s%s",
-              prefix,
-              context.getShardNumber(),
-              context.getNumShards(),
-              outputFileHints.getSuggestedFilenameSuffix(),
-              suffix);
-      return baseFilename
-          .getCurrentDirectory()
-          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
-    }
+      List<String> inputs, PTransform<PCollection<String>, 
PCollection<String>> transform,
+      String baseName) throws IOException {
+    runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
   }
 
   /**
    * Performs a WriteFiles transform with the desired number of shards. 
Verifies the WriteFiles
    * transform calls the appropriate methods on a test sink in the correct 
order, as well as
-   * verifies that the elements of a PCollection are written to the sink. If 
numConfiguredShards is
-   * not null, also verifies that the output number of shards is correct.
+   * verifies that the elements of a PCollection are written to the sink. If 
numConfiguredShards
+   * is not null, also verifies that the output number of shards is correct.
    */
   private void runShardedWrite(
       List<String> inputs,
       PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName,
-      WriteFiles<String, ?, String> write)
-      throws IOException {
+      Optional<Integer> numConfiguredShards) throws IOException {
     // Flag to validate that the pipeline options are passed to the Sink
     WriteOptions options = 
TestPipeline.testingPipelineOptions().as(WriteOptions.class);
     options.setTestFlag("test_value");
@@ -649,15 +390,18 @@ public class WriteFilesTest {
     for (long i = 0; i < inputs.size(); i++) {
       timestamps.add(i + 1);
     }
+
+    SimpleSink sink = makeSimpleSink();
+    WriteFiles<String> write = WriteFiles.to(sink);
+    if (numConfiguredShards.isPresent()) {
+      write = write.withNumShards(numConfiguredShards.get());
+    }
     p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
         .apply(transform)
         .apply(write);
     p.run();
 
-    Optional<Integer> numShards =
-        (write.getNumShards() != null)
-            ? Optional.of(write.getNumShards().get()) : 
Optional.<Integer>absent();
-    checkFileContents(baseName, inputs, numShards);
+    checkFileContents(baseName, inputs, numConfiguredShards);
   }
 
   static void checkFileContents(String baseName, List<String> inputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
index f8cd00f..120d5ed 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
@@ -60,18 +60,6 @@ public class PipelineOptionsValidatorTest {
   }
 
   @Test
-  public void testWhenRequiredOptionIsSetAndClearedCli() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Missing required value for "
-        + "[--object, \"Fake Description\"].");
-
-    Required required = PipelineOptionsFactory.fromArgs(new 
String[]{"--object=blah"})
-        .as(Required.class);
-    required.setObject(null);
-    PipelineOptionsValidator.validateCli(Required.class, required);
-  }
-
-  @Test
   public void testWhenRequiredOptionIsNeverSet() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Missing required value for "
@@ -82,17 +70,6 @@ public class PipelineOptionsValidatorTest {
     PipelineOptionsValidator.validate(Required.class, required);
   }
 
-
-  @Test
-  public void testWhenRequiredOptionIsNeverSetCli() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Missing required value for "
-        + "[--object, \"Fake Description\"].");
-
-    Required required = PipelineOptionsFactory.fromArgs(new 
String[]{}).as(Required.class);
-    PipelineOptionsValidator.validateCli(Required.class, required);
-  }
-
   @Test
   public void testWhenRequiredOptionIsNeverSetOnSuperInterface() {
     expectedException.expect(IllegalArgumentException.class);
@@ -104,16 +81,6 @@ public class PipelineOptionsValidatorTest {
     PipelineOptionsValidator.validate(Required.class, options);
   }
 
-  @Test
-  public void testWhenRequiredOptionIsNeverSetOnSuperInterfaceCli() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Missing required value for "
-        + "[--object, \"Fake Description\"].");
-
-    PipelineOptions options = PipelineOptionsFactory.fromArgs(new 
String[]{}).create();
-    PipelineOptionsValidator.validateCli(Required.class, options);
-  }
-
   /** A test interface that overrides the parent's method. */
   public interface SubClassValidation extends Required {
     @Override
@@ -133,17 +100,6 @@ public class PipelineOptionsValidatorTest {
     PipelineOptionsValidator.validate(Required.class, required);
   }
 
-  @Test
-  public void testValidationOnOverriddenMethodsCli() throws Exception {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Missing required value for "
-        + "[--object, \"Fake Description\"].");
-
-    SubClassValidation required = PipelineOptionsFactory.fromArgs(new 
String[]{})
-        .as(SubClassValidation.class);
-    PipelineOptionsValidator.validateCli(Required.class, required);
-  }
-
   /** A test interface with a required group. */
   public interface GroupRequired extends PipelineOptions {
     @Validation.Required(groups = {"ham"})

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index fb0a0d7..2c43f57 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -44,7 +44,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.testing.EqualsTester;
 import java.io.IOException;
-import java.io.NotSerializableException;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
@@ -55,7 +54,6 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
@@ -1021,21 +1019,4 @@ public class ProxyInvocationHandlerTest {
     DisplayData data = DisplayData.from(options);
     assertThat(data, not(hasDisplayItem("value")));
   }
-
-  private static class CapturesOptions implements Serializable {
-    PipelineOptions options = PipelineOptionsFactory.create();
-  }
-
-  @Test
-  public void testOptionsAreNotSerializable() {
-    
expectedException.expectCause(Matchers.<Throwable>instanceOf(NotSerializableException.class));
-    SerializableUtils.clone(new CapturesOptions());
-  }
-
-  @Test
-  public void testGetOptionNameFromMethod() throws NoSuchMethodException {
-    ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, 
Object>newHashMap());
-    handler.as(BaseOptions.class);
-    assertEquals("foo", 
handler.getOptionName(BaseOptions.class.getMethod("getFoo")));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 93650dd..1197d1b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.runners;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -33,8 +32,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -495,198 +492,4 @@ public class TransformHierarchyTest implements 
Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, 
replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, 
output));
   }
-
-  @Test
-  public void visitIsTopologicallyOrdered() {
-    PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
-    final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), 
IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
-    final PDone done = PDone.in(pipeline);
-    final TupleTag<String> oneTag = new TupleTag<String>() {};
-    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
-    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, 
one).and(twoTag, two);
-
-    PTransform<PCollection<String>, PDone> multiConsumer =
-        new PTransform<PCollection<String>, PDone>() {
-          @Override
-          public PDone expand(PCollection<String> input) {
-            return done;
-          }
-
-          @Override
-          public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-            return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
-          }
-        };
-    hierarchy.pushNode("consumes_both", one, multiConsumer);
-    hierarchy.setOutput(done);
-    hierarchy.popNode();
-
-    final PTransform<PBegin, PCollectionTuple> producer =
-        new PTransform<PBegin, PCollectionTuple>() {
-          @Override
-          public PCollectionTuple expand(PBegin input) {
-            return oneAndTwo;
-          }
-        };
-    hierarchy.pushNode(
-        "encloses_producer",
-        PBegin.in(pipeline),
-        new PTransform<PBegin, PCollectionTuple>() {
-          @Override
-          public PCollectionTuple expand(PBegin input) {
-            return input.apply(producer);
-          }
-        });
-    hierarchy.pushNode(
-        "creates_one_and_two",
-        PBegin.in(pipeline), producer);
-    hierarchy.setOutput(oneAndTwo);
-    hierarchy.popNode();
-    hierarchy.setOutput(oneAndTwo);
-    hierarchy.popNode();
-
-    hierarchy.pushNode("second_copy_of_consumes_both", one, multiConsumer);
-    hierarchy.setOutput(done);
-    hierarchy.popNode();
-
-    final Set<Node> visitedNodes = new HashSet<>();
-    final Set<Node> exitedNodes = new HashSet<>();
-    final Set<PValue> visitedValues = new HashSet<>();
-    hierarchy.visit(
-        new PipelineVisitor.Defaults() {
-
-          @Override
-          public CompositeBehavior enterCompositeTransform(Node node) {
-            for (PValue input : node.getInputs().values()) {
-              assertThat(visitedValues, hasItem(input));
-            }
-            assertThat(
-                "Nodes should not be visited more than once", visitedNodes, 
not(hasItem(node)));
-            if (!node.isRootNode()) {
-              assertThat(
-                  "Nodes should always be visited after their enclosing nodes",
-                  visitedNodes,
-                  hasItem(node.getEnclosingNode()));
-            }
-            visitedNodes.add(node);
-            return CompositeBehavior.ENTER_TRANSFORM;
-          }
-
-          @Override
-          public void leaveCompositeTransform(Node node) {
-            assertThat(visitedNodes, hasItem(node));
-            if (!node.isRootNode()) {
-              assertThat(
-                  "Nodes should always be left before their enclosing nodes 
are left",
-                  exitedNodes,
-                  not(hasItem(node.getEnclosingNode())));
-            }
-            assertThat(exitedNodes, not(hasItem(node)));
-            exitedNodes.add(node);
-          }
-
-          @Override
-          public void visitPrimitiveTransform(Node node) {
-            assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
-            assertThat(exitedNodes, not(hasItem(node.getEnclosingNode())));
-            assertThat(
-                "Nodes should not be visited more than once", visitedNodes, 
not(hasItem(node)));
-            for (PValue input : node.getInputs().values()) {
-              assertThat(visitedValues, hasItem(input));
-            }
-            visitedNodes.add(node);
-          }
-
-          @Override
-          public void visitValue(PValue value, Node producer) {
-            assertThat(visitedNodes, hasItem(producer));
-            assertThat(visitedValues, not(hasItem(value)));
-            visitedValues.add(value);
-          }
-        });
-    assertThat("Should have visited all the nodes", visitedNodes.size(), 
equalTo(5));
-    assertThat("Should have left all of the visited composites", 
exitedNodes.size(), equalTo(2));
-  }
-
-  @Test
-  public void visitDoesNotVisitSkippedNodes() {
-    PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
-    final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), 
IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
-    final PDone done = PDone.in(pipeline);
-    final TupleTag<String> oneTag = new TupleTag<String>() {};
-    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
-    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, 
one).and(twoTag, two);
-
-    hierarchy.pushNode(
-        "consumes_both",
-        one,
-        new PTransform<PCollection<String>, PDone>() {
-          @Override
-          public PDone expand(PCollection<String> input) {
-            return done;
-          }
-
-          @Override
-          public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-            return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
-          }
-        });
-    hierarchy.setOutput(done);
-    hierarchy.popNode();
-
-    final PTransform<PBegin, PCollectionTuple> producer =
-        new PTransform<PBegin, PCollectionTuple>() {
-          @Override
-          public PCollectionTuple expand(PBegin input) {
-            return oneAndTwo;
-          }
-        };
-    final Node enclosing =
-        hierarchy.pushNode(
-            "encloses_producer",
-            PBegin.in(pipeline),
-            new PTransform<PBegin, PCollectionTuple>() {
-              @Override
-              public PCollectionTuple expand(PBegin input) {
-                return input.apply(producer);
-              }
-            });
-    Node enclosed = hierarchy.pushNode("creates_one_and_two", 
PBegin.in(pipeline), producer);
-    hierarchy.setOutput(oneAndTwo);
-    hierarchy.popNode();
-    hierarchy.setOutput(oneAndTwo);
-    hierarchy.popNode();
-
-    final Set<Node> visitedNodes = new HashSet<>();
-    hierarchy.visit(
-        new PipelineVisitor.Defaults() {
-          @Override
-          public CompositeBehavior enterCompositeTransform(Node node) {
-            visitedNodes.add(node);
-            return node.equals(enclosing)
-                ? CompositeBehavior.DO_NOT_ENTER_TRANSFORM
-                : CompositeBehavior.ENTER_TRANSFORM;
-          }
-
-          @Override
-          public void visitPrimitiveTransform(Node node) {
-            visitedNodes.add(node);
-          }
-        });
-
-    assertThat(visitedNodes, hasItem(enclosing));
-    assertThat(visitedNodes, not(hasItem(enclosed)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index aaf8b91..adf27f8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -22,9 +22,7 @@ import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -39,7 +37,6 @@ import 
org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -352,10 +349,5 @@ public final class PCollectionViewTesting {
           .add("viewFn", viewFn)
           .toString();
     }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index b24d82d..dc9788f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
@@ -29,13 +29,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -86,6 +85,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mock;
 
 /**
  * Tests for Combine transforms.
@@ -95,8 +95,18 @@ public class CombineTest implements Serializable {
   // This test is Serializable, just so that it's easy to have
   // anonymous inner classes inside the non-static test methods.
 
+  static final List<KV<String, Integer>> TABLE = Arrays.asList(
+    KV.of("a", 1),
+    KV.of("a", 1),
+    KV.of("a", 4),
+    KV.of("b", 1),
+    KV.of("b", 13)
+  );
+
   static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();
 
+  @Mock private DoFn<?, ?>.ProcessContext processContext;
+
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
 
@@ -140,12 +150,12 @@ public class CombineTest implements Serializable {
     PCollection<KV<String, String>> combinePerKey =
         perKeyInput.apply(
             Combine.<String, Integer, String>perKey(new 
TestCombineFnWithContext(globallySumView))
-                .withSideInputs(globallySumView));
+                .withSideInputs(Arrays.asList(globallySumView)));
 
     PCollection<String> combineGlobally = globallyInput
         .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
-            .withSideInputs(globallySumView));
+            .withSideInputs(Arrays.asList(globallySumView)));
 
     PAssert.that(sum).containsInAnyOrder(globalSum);
     PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines);
@@ -158,28 +168,16 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombine() {
-    runTestSimpleCombine(Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 1),
-      KV.of("a", 4),
-      KV.of("b", 1),
-      KV.of("b", 13)
-    ), 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
+    runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), 
KV.of("b", "113")));
   }
 
   @Test
   @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineWithContext() {
-    runTestSimpleCombineWithContext(Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 1),
-      KV.of("a", 4),
-      KV.of("b", 1),
-      KV.of("b", 13)
-    ), 20,
-        Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")),
-        new String[] {"20:111134"});
+    runTestSimpleCombineWithContext(TABLE, 20,
+        Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")),
+        new String[] {"01111234"});
   }
 
   @Test
@@ -218,13 +216,7 @@ public class CombineTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testBasicCombine() {
-    runTestBasicCombine(Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 1),
-      KV.of("a", 4),
-      KV.of("b", 1),
-      KV.of("b", 13)
-    ), ImmutableSet.of(1, 13, 4), Arrays.asList(
+    runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList(
         KV.of("a", (Set<Integer>) ImmutableSet.of(1, 4)),
         KV.of("b", (Set<Integer>) ImmutableSet.of(1, 13))));
   }
@@ -259,16 +251,9 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testFixedWindowsCombine() {
     PCollection<KV<String, Integer>> input =
-        pipeline
-            .apply(
-                Create.timestamped(
-                        TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
-                        TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
-                        TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
-                        TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
-                        TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
-                    .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
-            .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(2))));
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 
8L))
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
+         .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(2))));
 
     PCollection<Integer> sum = input
         .apply(Values.<Integer>create())
@@ -278,9 +263,11 @@ public class CombineTest implements Serializable {
         .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
-    PAssert.that(sumPerKey)
-        .containsInAnyOrder(
-            Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), 
KV.of("b", "13")));
+    PAssert.that(sumPerKey).containsInAnyOrder(
+        KV.of("a", "11"),
+        KV.of("a", "4"),
+        KV.of("b", "1"),
+        KV.of("b", "13"));
     pipeline.run();
   }
 
@@ -288,16 +275,9 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testFixedWindowsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline
-            .apply(
-                Create.timestamped(
-                        TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
-                        TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
-                        TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
-                        TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
-                        TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
-                    .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
-            .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(2))));
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 
8L))
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
+         .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(2))));
 
     PCollection<Integer> globallyInput = 
perKeyInput.apply(Values.<Integer>create());
 
@@ -309,129 +289,60 @@ public class CombineTest implements Serializable {
     PCollection<KV<String, String>> combinePerKeyWithContext =
         perKeyInput.apply(
             Combine.<String, Integer, String>perKey(new 
TestCombineFnWithContext(globallySumView))
-                .withSideInputs(globallySumView));
+                .withSideInputs(Arrays.asList(globallySumView)));
 
     PCollection<String> combineGloballyWithContext = globallyInput
         .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
-            .withSideInputs(globallySumView));
+            .withSideInputs(Arrays.asList(globallySumView)));
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
-    PAssert.that(combinePerKeyWithContext)
-        .containsInAnyOrder(
-            Arrays.asList(
-                KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), 
KV.of("b", "13:13")));
-    PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", 
"5:14", "13:13");
-    pipeline.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testSlidingWindowsCombine() {
-    PCollection<String> input =
-        pipeline
-            .apply(
-                Create.timestamped(
-                    TimestampedValue.of("a", new Instant(1L)),
-                    TimestampedValue.of("b", new Instant(2L)),
-                    TimestampedValue.of("c", new Instant(3L))))
-            .apply(
-                Window.<String>into(
-                    
SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L))));
-    PCollection<List<String>> combined =
-        input.apply(
-            Combine.globally(
-                    new CombineFn<String, List<String>, List<String>>() {
-                      @Override
-                      public List<String> createAccumulator() {
-                        return new ArrayList<>();
-                      }
-
-                      @Override
-                      public List<String> addInput(List<String> accumulator, 
String input) {
-                        accumulator.add(input);
-                        return accumulator;
-                      }
-
-                      @Override
-                      public List<String> 
mergeAccumulators(Iterable<List<String>> accumulators) {
-                        // Mutate all of the accumulators. Instances should be 
used in only one
-                        // place, and not
-                        // reused after merging.
-                        List<String> cur = createAccumulator();
-                        for (List<String> accumulator : accumulators) {
-                          accumulator.addAll(cur);
-                          cur = accumulator;
-                        }
-                        return cur;
-                      }
-
-                      @Override
-                      public List<String> extractOutput(List<String> 
accumulator) {
-                        List<String> result = new ArrayList<>(accumulator);
-                        Collections.sort(result);
-                        return result;
-                      }
-                    })
-                .withoutDefaults());
-
-    PAssert.that(combined)
-        .containsInAnyOrder(
-            ImmutableList.of("a"),
-            ImmutableList.of("a", "b"),
-            ImmutableList.of("a", "b", "c"),
-            ImmutableList.of("b", "c"),
-            ImmutableList.of("c"));
-
+    PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
+        KV.of("a", "112"),
+        KV.of("a", "45"),
+        KV.of("b", "15"),
+        KV.of("b", "1133"));
+    PAssert.that(combineGloballyWithContext).containsInAnyOrder("112", "145", 
"1133");
     pipeline.run();
   }
 
   @Test
   @Category(ValidatesRunner.class)
   public void testSlidingWindowsCombineWithContext() {
-    // [a: 1, 1], [a: 4; b: 1], [b: 13]
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline
-            .apply(
-                Create.timestamped(
-                        TimestampedValue.of(KV.of("a", 1), new Instant(2L)),
-                        TimestampedValue.of(KV.of("a", 1), new Instant(3L)),
-                        TimestampedValue.of(KV.of("a", 4), new Instant(8L)),
-                        TimestampedValue.of(KV.of("b", 1), new Instant(9L)),
-                        TimestampedValue.of(KV.of("b", 13), new Instant(10L)))
-                    .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
-            .apply(Window.<KV<String, 
Integer>>into(SlidingWindows.of(Duration.millis(2))));
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 
10L))
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
+         .apply(Window.<KV<String, 
Integer>>into(SlidingWindows.of(Duration.millis(2))));
 
     PCollection<Integer> globallyInput = 
perKeyInput.apply(Values.<Integer>create());
 
-    PCollection<Integer> sum = globallyInput.apply("Sum", 
Sum.integersGlobally().withoutDefaults());
+    PCollection<Integer> sum = globallyInput
+        .apply("Sum", Combine.globally(new SumInts()).withoutDefaults());
 
     PCollectionView<Integer> globallySumView = 
sum.apply(View.<Integer>asSingleton());
 
     PCollection<KV<String, String>> combinePerKeyWithContext =
         perKeyInput.apply(
             Combine.<String, Integer, String>perKey(new 
TestCombineFnWithContext(globallySumView))
-                .withSideInputs(globallySumView));
+                .withSideInputs(Arrays.asList(globallySumView)));
 
     PCollection<String> combineGloballyWithContext = globallyInput
         .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
-            .withSideInputs(globallySumView));
+            .withSideInputs(Arrays.asList(globallySumView)));
 
     PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
-    PAssert.that(combinePerKeyWithContext)
-        .containsInAnyOrder(
-            Arrays.asList(
-                KV.of("a", "1:1"),
-                KV.of("a", "2:11"),
-                KV.of("a", "1:1"),
-                KV.of("a", "4:4"),
-                KV.of("a", "5:4"),
-                KV.of("b", "5:1"),
-                KV.of("b", "14:113"),
-                KV.of("b", "13:13")));
+    PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
+        KV.of("a", "11"),
+        KV.of("a", "112"),
+        KV.of("a", "11"),
+        KV.of("a", "44"),
+        KV.of("a", "45"),
+        KV.of("b", "15"),
+        KV.of("b", "11134"),
+        KV.of("b", "1133"));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder(
-      "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13");
+      "11", "112", "11", "44", "145", "11134", "1133");
     pipeline.run();
   }
 
@@ -472,16 +383,9 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testSessionsCombine() {
     PCollection<KV<String, Integer>> input =
-        pipeline
-            .apply(
-                Create.timestamped(
-                        TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
-                        TimestampedValue.of(KV.of("a", 1), new Instant(4L)),
-                        TimestampedValue.of(KV.of("a", 4), new Instant(7L)),
-                        TimestampedValue.of(KV.of("b", 1), new Instant(10L)),
-                        TimestampedValue.of(KV.of("b", 13), new Instant(16L)))
-                    .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
-            .apply(Window.<KV<String, 
Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 
10L, 16L))
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
+         .apply(Window.<KV<String, 
Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
 
     PCollection<Integer> sum = input
         .apply(Values.<Integer>create())
@@ -491,8 +395,10 @@ public class CombineTest implements Serializable {
         .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(7, 13);
-    PAssert.that(sumPerKey)
-        .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), 
KV.of("b", "13")));
+    PAssert.that(sumPerKey).containsInAnyOrder(
+        KV.of("a", "114"),
+        KV.of("b", "1"),
+        KV.of("b", "13"));
     pipeline.run();
   }
 
@@ -500,13 +406,7 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testSessionsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(
-            Create.timestamped(
-                    TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
-                    TimestampedValue.of(KV.of("a", 1), new Instant(4L)),
-                    TimestampedValue.of(KV.of("a", 4), new Instant(7L)),
-                    TimestampedValue.of(KV.of("b", 1), new Instant(10L)),
-                    TimestampedValue.of(KV.of("b", 13), new Instant(16L)))
+        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 
10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
 
     PCollection<Integer> globallyInput = 
perKeyInput.apply(Values.<Integer>create());
@@ -527,23 +427,21 @@ public class CombineTest implements Serializable {
             .apply(
                 Combine.<String, Integer, String>perKey(
                         new TestCombineFnWithContext(globallyFixedWindowsView))
-                    .withSideInputs(globallyFixedWindowsView));
+                    .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
 
-    PCollection<String> sessionsCombineGlobally =
-        globallyInput
-            .apply(
-                "Globally Input Sessions",
-                
Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5))))
-            .apply(
-                Combine.globally(new 
TestCombineFnWithContext(globallyFixedWindowsView))
-                    .withoutDefaults()
-                    .withSideInputs(globallyFixedWindowsView));
+    PCollection<String> sessionsCombineGlobally = globallyInput
+        .apply("Globally Input Sessions",
+            Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5))))
+        .apply(Combine.globally(new 
TestCombineFnWithContext(globallyFixedWindowsView))
+            .withoutDefaults()
+            .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
 
     PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
-    PAssert.that(sessionsCombinePerKey)
-        .containsInAnyOrder(
-            Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", 
"0:13")));
-    PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13");
+    PAssert.that(sessionsCombinePerKey).containsInAnyOrder(
+        KV.of("a", "1114"),
+        KV.of("b", "11"),
+        KV.of("b", "013"));
+    PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114", "013");
     pipeline.run();
   }
 
@@ -563,13 +461,7 @@ public class CombineTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testAccumulatingCombine() {
-    runTestAccumulatingCombine(Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 1),
-      KV.of("a", 4),
-      KV.of("b", 1),
-      KV.of("b", 13)
-    ), 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)));
+    runTestAccumulatingCombine(TABLE, 4.0, Arrays.asList(KV.of("a", 2.0), 
KV.of("b", 7.0)));
   }
 
   @Test
@@ -611,13 +503,7 @@ public class CombineTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testHotKeyCombining() {
-    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, 
Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 1),
-      KV.of("a", 4),
-      KV.of("b", 1),
-      KV.of("b", 13)
-    )), 10);
+    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, 
TABLE), 10);
 
     CombineFn<Integer, ?, Double> mean = new MeanInts();
     PCollection<KV<String, Double>> coldMean = input.apply("ColdMean",
@@ -674,13 +560,7 @@ public class CombineTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testBinaryCombineFn() {
-    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, 
Arrays.asList(
-      KV.of("a", 1),
-      KV.of("a", 1),
-      KV.of("a", 4),
-      KV.of("b", 1),
-      KV.of("b", 13)
-    )), 2);
+    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, 
TABLE), 2);
     PCollection<KV<String, Integer>> intProduct = input
         .apply("IntProduct", Combine.<String, Integer, Integer>perKey(new 
TestProdInt()));
     PCollection<KV<String, Integer>> objProduct = input
@@ -771,7 +651,7 @@ public class CombineTest implements Serializable {
         pipeline
             .apply(
                 "CreateMainInput",
-                Create.timestamped(nonEmptyElement, 
emptyElement).withCoder(VoidCoder.of()))
+                Create.<Void>timestamped(nonEmptyElement, 
emptyElement).withCoder(VoidCoder.of()))
             .apply("WindowMainInput", Window.<Void>into(windowFn))
             .apply(
                 "OutputSideInput",
@@ -996,13 +876,15 @@ public class CombineTest implements Serializable {
      */
     private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
-      public void encode(CountSum value, OutputStream outStream) throws 
IOException {
+      public void encode(CountSum value, OutputStream outStream)
+          throws CoderException, IOException {
         LONG_CODER.encode(value.count, outStream);
         DOUBLE_CODER.encode(value.sum, outStream);
       }
 
       @Override
-      public CountSum decode(InputStream inStream) throws IOException {
+      public CountSum decode(InputStream inStream)
+          throws CoderException, IOException {
         long count = LONG_CODER.decode(inStream);
         double sum = DOUBLE_CODER.decode(inStream);
         return new CountSum(count, sum);
@@ -1035,26 +917,34 @@ public class CombineTest implements Serializable {
 
     // Not serializable.
     static class Accumulator {
-      final String seed;
       String value;
-      public Accumulator(String seed, String value) {
-        this.seed = seed;
+      public Accumulator(String value) {
         this.value = value;
       }
 
       public static Coder<Accumulator> getCoder() {
         return new AtomicCoder<Accumulator>() {
           @Override
-          public void encode(Accumulator accumulator, OutputStream outStream) 
throws IOException {
-            StringUtf8Coder.of().encode(accumulator.seed, outStream);
-            StringUtf8Coder.of().encode(accumulator.value, outStream);
+          public void encode(Accumulator accumulator, OutputStream outStream)
+              throws CoderException, IOException {
+            encode(accumulator, outStream, Coder.Context.NESTED);
           }
 
           @Override
-          public Accumulator decode(InputStream inStream) throws IOException {
-            String seed = StringUtf8Coder.of().decode(inStream);
-            String value = StringUtf8Coder.of().decode(inStream);
-            return new Accumulator(seed, value);
+          public void encode(Accumulator accumulator, OutputStream outStream, 
Coder.Context context)
+              throws CoderException, IOException {
+            StringUtf8Coder.of().encode(accumulator.value, outStream, context);
+          }
+
+          @Override
+          public Accumulator decode(InputStream inStream) throws 
CoderException, IOException {
+            return decode(inStream, Coder.Context.NESTED);
+          }
+
+          @Override
+          public Accumulator decode(InputStream inStream, Coder.Context 
context)
+              throws CoderException, IOException {
+            return new Accumulator(StringUtf8Coder.of().decode(inStream, 
context));
           }
         };
       }
@@ -1068,13 +958,13 @@ public class CombineTest implements Serializable {
 
     @Override
     public Accumulator createAccumulator() {
-      return new Accumulator("", "");
+      return new Accumulator("");
     }
 
     @Override
     public Accumulator addInput(Accumulator accumulator, Integer value) {
       try {
-        return new Accumulator(accumulator.seed, accumulator.value + 
String.valueOf(value));
+        return new Accumulator(accumulator.value + String.valueOf(value));
       } finally {
         accumulator.value = "cleared in addInput";
       }
@@ -1082,22 +972,12 @@ public class CombineTest implements Serializable {
 
     @Override
     public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
-      Accumulator seedAccumulator = null;
-      StringBuilder all = new StringBuilder();
+      String all = "";
       for (Accumulator accumulator : accumulators) {
-        if (seedAccumulator == null) {
-          seedAccumulator = accumulator;
-        } else {
-          assertEquals(
-              String.format(
-                  "Different seed values in accumulator: %s vs. %s", 
seedAccumulator, accumulator),
-              seedAccumulator.seed,
-              accumulator.seed);
-        }
-        all.append(accumulator.value);
+        all += accumulator.value;
         accumulator.value = "cleared in mergeAccumulators";
       }
-      return new Accumulator(checkNotNull(seedAccumulator).seed, 
all.toString());
+      return new Accumulator(all);
     }
 
     @Override
@@ -1127,47 +1007,40 @@ public class CombineTest implements Serializable {
 
     @Override
     public TestCombineFn.Accumulator createAccumulator(Context c) {
-      Integer sideInputValue = c.sideInput(view);
-      return new TestCombineFn.Accumulator(sideInputValue.toString(), "");
+      return new TestCombineFn.Accumulator(c.sideInput(view).toString());
     }
 
     @Override
     public TestCombineFn.Accumulator addInput(
         TestCombineFn.Accumulator accumulator, Integer value, Context c) {
       try {
-        assertThat(
-            "Not expecting view contents to change",
-            accumulator.seed,
-            Matchers.equalTo(Integer.toString(c.sideInput(view))));
-        return new TestCombineFn.Accumulator(
-            accumulator.seed, accumulator.value + String.valueOf(value));
+        assertThat(accumulator.value, 
Matchers.startsWith(c.sideInput(view).toString()));
+        return new TestCombineFn.Accumulator(accumulator.value + 
String.valueOf(value));
       } finally {
         accumulator.value = "cleared in addInput";
       }
+
     }
 
     @Override
     public TestCombineFn.Accumulator mergeAccumulators(
         Iterable<TestCombineFn.Accumulator> accumulators, Context c) {
-      String sideInputValue = c.sideInput(view).toString();
-      StringBuilder all = new StringBuilder();
+      String prefix = c.sideInput(view).toString();
+      String all = prefix;
       for (TestCombineFn.Accumulator accumulator : accumulators) {
-        assertThat(
-            "Accumulators should all have the same Side Input Value",
-            accumulator.seed,
-            Matchers.equalTo(sideInputValue));
-        all.append(accumulator.value);
+        assertThat(accumulator.value, Matchers.startsWith(prefix));
+        all += accumulator.value.substring(prefix.length());
         accumulator.value = "cleared in mergeAccumulators";
       }
-      return new TestCombineFn.Accumulator(sideInputValue, all.toString());
+      return new TestCombineFn.Accumulator(all);
     }
 
     @Override
     public String extractOutput(TestCombineFn.Accumulator accumulator, Context 
c) {
-      assertThat(accumulator.seed, 
Matchers.startsWith(c.sideInput(view).toString()));
+      assertThat(accumulator.value, 
Matchers.startsWith(c.sideInput(view).toString()));
       char[] chars = accumulator.value.toCharArray();
       Arrays.sort(chars);
-      return accumulator.seed + ":" + new String(chars);
+      return new String(chars);
     }
   }
 
@@ -1205,7 +1078,7 @@ public class CombineTest implements Serializable {
       @Override
       public void mergeAccumulator(Counter accumulator) {
         checkState(outputs == 0);
-        assertEquals(0, accumulator.outputs);
+        checkArgument(accumulator.outputs == 0);
 
         merges += accumulator.merges + 1;
         inputs += accumulator.inputs;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 5cb9e18..1bb71bb 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -360,38 +360,6 @@ public class DoFnTesterTest {
     }
   }
 
-  @Test
-  public void testSupportsFinishBundleOutput() throws Exception {
-    for (DoFnTester.CloningBehavior cloning : 
DoFnTester.CloningBehavior.values()) {
-      try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new 
BundleCounterDoFn())) {
-        tester.setCloningBehavior(cloning);
-
-        assertThat(tester.processBundle(1, 2, 3, 4), contains(4));
-        assertThat(tester.processBundle(5, 6, 7), contains(3));
-        assertThat(tester.processBundle(8, 9), contains(2));
-      }
-    }
-  }
-
-  private static class BundleCounterDoFn extends DoFn<Integer, Integer> {
-    private int elements;
-
-    @StartBundle
-    public void startBundle() {
-      elements = 0;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      elements++;
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext c) {
-      c.output(elements, Instant.now(), GlobalWindow.INSTANCE);
-    }
-  }
-
   private static class SideInputDoFn extends DoFn<Integer, Integer> {
     private final PCollectionView<Integer> value;
 

Reply via email to