Repository: incubator-beam
Updated Branches:
  refs/heads/master 659f0b877 -> ac63fd6d4


Move Java 8 tests to their own module

This allows easy setting of the compiler source and target version to
1.8 without any fragile plugin configuration. It also allows the
import into Eclipse to easily set separate compliance levels, as this
is done per-Eclipse-project.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e8b71008
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e8b71008
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e8b71008

Branch: refs/heads/master
Commit: e8b71008c18439d4e30edf7e29bdaa58fa6e65f3
Parents: f06e2a9
Author: Kenneth Knowles <k...@google.com>
Authored: Mon Feb 29 22:40:01 2016 -0800
Committer: bchambers <bchamb...@google.com>
Committed: Thu Mar 17 11:12:36 2016 -0700

----------------------------------------------------------------------
 java8tests/pom.xml                              | 183 +++++++++++++++++++
 .../sdk/transforms/CombineJava8Test.java        | 133 ++++++++++++++
 .../sdk/transforms/FilterJava8Test.java         | 118 ++++++++++++
 .../transforms/FlatMapElementsJava8Test.java    |  84 +++++++++
 .../sdk/transforms/MapElementsJava8Test.java    |  77 ++++++++
 .../sdk/transforms/PartitionJava8Test.java      |  74 ++++++++
 .../transforms/RemoveDuplicatesJava8Test.java   |  98 ++++++++++
 .../sdk/transforms/WithKeysJava8Test.java       |  73 ++++++++
 .../sdk/transforms/WithTimestampsJava8Test.java |  65 +++++++
 pom.xml                                         |   9 +
 sdk/pom.xml                                     |  71 -------
 .../sdk/transforms/CombineJava8Test.java        | 133 --------------
 .../sdk/transforms/FilterJava8Test.java         | 118 ------------
 .../transforms/FlatMapElementsJava8Test.java    |  84 ---------
 .../sdk/transforms/MapElementsJava8Test.java    |  77 --------
 .../sdk/transforms/PartitionJava8Test.java      |  74 --------
 .../transforms/RemoveDuplicatesJava8Test.java   |  99 ----------
 .../sdk/transforms/WithKeysJava8Test.java       |  74 --------
 .../sdk/transforms/WithTimestampsJava8Test.java |  66 -------
 19 files changed, 914 insertions(+), 796 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/java8tests/pom.xml b/java8tests/pom.xml
new file mode 100644
index 0000000..de44ed4
--- /dev/null
+++ b/java8tests/pom.xml
@@ -0,0 +1,183 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+  ~ Copyright (C) 2015 Google Inc.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
+  ~ use this file except in compliance with the License. You may obtain a copy 
of
+  ~ the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  ~ License for the specific language governing permissions and limitations 
under
+  ~ the License.
+  
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>com.google.cloud.dataflow</groupId>
+    <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>com.google.cloud.dataflow</groupId>
+  <artifactId>google-cloud-dataflow-java-java8tests-all</artifactId>
+  <name>Google Cloud Dataflow Java 8 Tests - All</name>
+  <description>Google Cloud Dataflow Java SDK provides a simple, Java-based
+    interface for processing virtually any size data using Google cloud
+    resources. This artifact includes tests of the SDK from a Java 8
+    user.</description>
+  <url>http://cloud.google.com/dataflow</url>
+
+  <packaging>jar</packaging>
+
+  <profiles>
+    <profile>
+      <id>DataflowPipelineTests</id>
+      <properties>
+        <runIntegrationTestOnService>true</runIntegrationTestOnService>
+        
<testGroups>com.google.cloud.dataflow.sdk.testing.RunnableOnService</testGroups>
+        <testParallelValue>both</testParallelValue>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <testSource>1.8</testSource>
+          <testTarget>1.8</testTarget>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals><goal>analyze-only</goal></goals>
+            <configuration>
+              <failOnWarning>true</failOnWarning>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.12</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>6.6</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>../checkstyle.xml</configLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+          <includeResources>false</includeResources>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>attach-test-sources</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-jar</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>default-test-jar</id>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.cloud.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
new file mode 100644
index 0000000..b569e49
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Java 8 Tests for {@link Combine}.
+ */
+@RunWith(JUnit4.class)
+@SuppressWarnings("serial")
+public class CombineJava8Test implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Class for use in testing use of Java 8 method references.
+   */
+  private static class Summer implements Serializable {
+    public int sum(Iterable<Integer> integers) {
+      int sum = 0;
+      for (int i : integers) {
+        sum += i;
+      }
+      return sum;
+    }
+  }
+
+  /**
+   * Tests creation of a global {@link Combine} via Java 8 lambda.
+   */
+  @Test
+  public void testCombineGloballyLambda() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3, 4))
+        .apply(Combine.globally(integers -> {
+          int sum = 0;
+          for (int i : integers) {
+            sum += i;
+          }
+          return sum;
+        }));
+
+    DataflowAssert.that(output).containsInAnyOrder(10);
+    pipeline.run();
+  }
+
+  /**
+   * Tests creation of a global {@link Combine} via a Java 8 method reference.
+   */
+  @Test
+  public void testCombineGloballyInstanceMethodReference() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3, 4))
+        .apply(Combine.globally(new Summer()::sum));
+
+    DataflowAssert.that(output).containsInAnyOrder(10);
+    pipeline.run();
+  }
+
+  /**
+   * Tests creation of a per-key {@link Combine} via a Java 8 lambda.
+   */
+  @Test
+  public void testCombinePerKeyLambda() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<KV<String, Integer>> output = pipeline
+        .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), 
KV.of("c", 4)))
+        .apply(Combine.perKey(integers -> {
+          int sum = 0;
+          for (int i : integers) {
+            sum += i;
+          }
+          return sum;
+        }));
+
+    DataflowAssert.that(output).containsInAnyOrder(
+        KV.of("a", 4),
+        KV.of("b", 2),
+        KV.of("c", 4));
+    pipeline.run();
+  }
+
+  /**
+   * Tests creation of a per-key {@link Combine} via a Java 8 method reference.
+   */
+  @Test
+  public void testCombinePerKeyInstanceMethodReference() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<KV<String, Integer>> output = pipeline
+        .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), 
KV.of("c", 4)))
+        .apply(Combine.perKey(new Summer()::sum));
+
+    DataflowAssert.that(output).containsInAnyOrder(
+        KV.of("a", 4),
+        KV.of("b", 2),
+        KV.of("c", 4));
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
new file mode 100644
index 0000000..db65932
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Java 8 Tests for {@link Filter}.
+ */
+@RunWith(JUnit4.class)
+@SuppressWarnings("serial")
+public class FilterJava8Test implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testIdentityFilterByPredicate() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
+        .apply(Filter.byPredicate(i -> true));
+
+    DataflowAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 
24799, 307);
+    pipeline.run();
+  }
+
+  @Test
+  public void testNoFilterByPredicate() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 4, 5))
+        .apply(Filter.byPredicate(i -> false));
+
+    DataflowAssert.that(output).empty();
+    pipeline.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testFilterByPredicate() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+        .apply(Filter.byPredicate(i -> i % 2 == 0));
+
+    DataflowAssert.that(output).containsInAnyOrder(2, 4, 6);
+    pipeline.run();
+  }
+
+  /**
+   * Confirms that in Java 8 style, where a lambda results in a rawtype, the 
output type token is
+   * not useful. If this test ever fails there may be simplifications 
available to us.
+   */
+  @Test
+  public void testFilterParDoOutputTypeDescriptorRaw() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    PCollection<String> output = pipeline
+        .apply(Create.of("hello"))
+        .apply(Filter.by(s -> true));
+
+    thrown.expect(CannotProvideCoderException.class);
+    pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor());
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testFilterByMethodReference() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+        .apply(Filter.byPredicate(new EvenFilter()::isEven));
+
+    DataflowAssert.that(output).containsInAnyOrder(2, 4, 6);
+    pipeline.run();
+  }
+
+  private static class EvenFilter implements Serializable {
+    public boolean isEven(int i) {
+      return i % 2 == 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
new file mode 100644
index 0000000..e0b946b
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Java 8 Tests for {@link FlatMapElements}.
+ */
+@RunWith(JUnit4.class)
+public class FlatMapElementsJava8Test implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Basic test of {@link FlatMapElements} with a lambda (which is 
instantiated as a
+   * {@link SerializableFunction}).
+   */
+  @Test
+  public void testFlatMapBasic() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(FlatMapElements
+            // Note that the input type annotation is required.
+            .via((Integer i) -> ImmutableList.of(i, -i))
+            .withOutputType(new TypeDescriptor<Integer>() {}));
+
+    DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
+    pipeline.run();
+  }
+
+  /**
+   * Basic test of {@link FlatMapElements} with a method reference.
+   */
+  @Test
+  public void testFlatMapMethodReference() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(FlatMapElements
+            // Note that the input type annotation is required.
+            .via(new Negater()::numAndNegation)
+            .withOutputType(new TypeDescriptor<Integer>() {}));
+
+    DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
+    pipeline.run();
+  }
+
+  private static class Negater implements Serializable {
+    public List<Integer> numAndNegation(int input) {
+      return ImmutableList.of(input, -input);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
new file mode 100644
index 0000000..123e680
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Java 8 tests for {@link MapElements}.
+ */
+@RunWith(JUnit4.class)
+public class MapElementsJava8Test implements Serializable {
+
+  /**
+   * Basic test of {@link MapElements} with a lambda (which is instantiated as 
a
+   * {@link SerializableFunction}).
+   */
+  @Test
+  public void testMapBasic() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(MapElements
+            // Note that the type annotation is required (for Java, not for 
Dataflow)
+            .via((Integer i) -> i * 2)
+            .withOutputType(new TypeDescriptor<Integer>() {}));
+
+    DataflowAssert.that(output).containsInAnyOrder(6, 2, 4);
+    pipeline.run();
+  }
+
+  /**
+   * Basic test of {@link MapElements} with a method reference.
+   */
+  @Test
+  public void testMapMethodReference() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> output = pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(MapElements
+            // Note that the type annotation is required (for Java, not for 
Dataflow)
+            .via(new Doubler()::doubleIt)
+            .withOutputType(new TypeDescriptor<Integer>() {}));
+
+    DataflowAssert.that(output).containsInAnyOrder(6, 2, 4);
+    pipeline.run();
+  }
+
+  private static class Doubler implements Serializable {
+    public int doubleIt(int val) {
+      return val * 2;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
new file mode 100644
index 0000000..c459ada
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Java 8 Tests for {@link Filter}.
+ */
+@RunWith(JUnit4.class)
+@SuppressWarnings("serial")
+public class PartitionJava8Test implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testModPartition() {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollectionList<Integer> outputs = pipeline
+        .apply(Create.of(1, 2, 4, 5))
+        .apply(Partition.of(3, (element, numPartitions) -> element % 
numPartitions));
+    assertEquals(3, outputs.size());
+    DataflowAssert.that(outputs.get(0)).empty();
+    DataflowAssert.that(outputs.get(1)).containsInAnyOrder(1, 4);
+    DataflowAssert.that(outputs.get(2)).containsInAnyOrder(2, 5);
+    pipeline.run();
+  }
+
+  /**
+   * Confirms that in Java 8 style, where a lambda results in a rawtype, the 
output type token is
+   * not useful. If this test ever fails there may be simplifications 
available to us.
+   */
+  @Test
+  public void testPartitionFnOutputTypeDescriptorRaw() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollectionList<String> output = pipeline
+        .apply(Create.of("hello"))
+        .apply(Partition.of(1, (element, numPartitions) -> 0));
+
+    thrown.expect(CannotProvideCoderException.class);
+    
pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
new file mode 100644
index 0000000..dfa1ca6
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Java 8 tests for {@link RemoveDuplicates}.
+ */
+@RunWith(JUnit4.class)
+public class RemoveDuplicatesJava8Test {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() 
{
+    TestPipeline p = TestPipeline.create();
+
+    Multimap<Integer, String> predupedContents = HashMultimap.create();
+    predupedContents.put(3, "foo");
+    predupedContents.put(4, "foos");
+    predupedContents.put(6, "barbaz");
+    predupedContents.put(6, "bazbar");
+    PCollection<String> dupes =
+        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+    PCollection<String> deduped =
+        dupes.apply(RemoveDuplicates.withRepresentativeValueFn((String s) -> 
s.length())
+                                    
.withRepresentativeType(TypeDescriptor.of(Integer.class)));
+
+    DataflowAssert.that(deduped).satisfies((Iterable<String> strs) -> {
+      Set<Integer> seenLengths = new HashSet<>();
+      for (String s : strs) {
+        assertThat(predupedContents.values(), hasItem(s));
+        assertThat(seenLengths, not(contains(s.length())));
+        seenLengths.add(s.length());
+      }
+      return null;
+    });
+
+    p.run();
+  }
+
+  @Test
+  public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
+    TestPipeline p = TestPipeline.create();
+
+    Multimap<Integer, String> predupedContents = HashMultimap.create();
+    predupedContents.put(3, "foo");
+    predupedContents.put(4, "foos");
+    predupedContents.put(6, "barbaz");
+    predupedContents.put(6, "bazbar");
+    PCollection<String> dupes =
+        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to return a default Coder for 
RemoveRepresentativeDupes");
+    thrown.expectMessage("Cannot provide a coder for type variable K");
+    thrown.expectMessage("the actual type is unknown due to erasure.");
+
+    // Thrown when applying a transform to the internal WithKeys that 
withRepresentativeValueFn is
+    // implemented with
+    dupes.apply("RemoveRepresentativeDupes",
+        RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
new file mode 100644
index 0000000..3771f78
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Java 8 Tests for {@link WithKeys}.
+ */
+@RunWith(JUnit4.class)
+public class WithKeysJava8Test {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void withLambdaAndTypeDescriptorShouldSucceed() {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> values = p.apply(Create.of("1234", "3210", "0", 
"-12"));
+    PCollection<KV<Integer, String>> kvs = values.apply(
+        WithKeys.of((String s) -> Integer.valueOf(s))
+                .withKeyType(TypeDescriptor.of(Integer.class)));
+
+    DataflowAssert.that(kvs).containsInAnyOrder(
+        KV.of(1234, "1234"), KV.of(0, "0"), KV.of(-12, "-12"), KV.of(3210, 
"3210"));
+
+    p.run();
+  }
+
+  @Test
+  public void withLambdaAndNoTypeDescriptorShouldThrow() {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> values = p.apply(Create.of("1234", "3210", "0", 
"-12"));
+
+    values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> 
Integer.valueOf(s)));
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectMessage("Unable to return a default Coder for 
ApplyKeysWithWithKeys");
+    thrown.expectMessage("Cannot provide a coder for type variable K");
+    thrown.expectMessage("the actual type is unknown due to erasure.");
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
new file mode 100644
index 0000000..b2b6dbc
--- /dev/null
+++ 
b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Java 8 tests for {@link WithTimestamps}.
+ */
+@RunWith(JUnit4.class)
+public class WithTimestampsJava8Test implements Serializable {
+  @Test
+  @Category(RunnableOnService.class)
+  public void withTimestampsLambdaShouldApplyTimestamps() {
+    TestPipeline p = TestPipeline.create();
+
+    String yearTwoThousand = "946684800000";
+    PCollection<String> timestamped =
+        p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), 
yearTwoThousand))
+         .apply(WithTimestamps.of((String input) -> new 
Instant(Long.valueOf(yearTwoThousand))));
+
+    PCollection<KV<String, Instant>> timestampedVals =
+        timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+          @Override
+          public void processElement(DoFn<String, KV<String, 
Instant>>.ProcessContext c)
+              throws Exception {
+            c.output(KV.of(c.element(), c.timestamp()));
+          }
+        }));
+
+    DataflowAssert.that(timestamped)
+        .containsInAnyOrder(yearTwoThousand, "0", "1234", 
Integer.toString(Integer.MAX_VALUE));
+    DataflowAssert.that(timestampedVals)
+        .containsInAnyOrder(
+            KV.of("0", new Instant(0)),
+            KV.of("1234", new Instant("1234")),
+            KV.of(Integer.toString(Integer.MAX_VALUE), new 
Instant(Integer.MAX_VALUE)),
+            KV.of(yearTwoThousand, new 
Instant(Long.valueOf(yearTwoThousand))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6fb0b32..8a44ea9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,15 @@
 
   <profiles>
     <profile>
+      <id>java8-tests</id>
+      <activation>
+        <jdk>[1.8,)</jdk>
+      </activation>
+      <modules>
+        <module>java8tests</module>
+      </modules>
+    </profile>
+    <profile>
       <id>doclint-java8-disable</id>
       <activation>
         <jdk>[1.8,)</jdk>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index d7e10a5..d652d80 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -54,77 +54,6 @@
         <testParallelValue>both</testParallelValue>
       </properties>
     </profile>
-
-    <profile>
-      <id>java8tests</id>
-      <activation>
-        <jdk>[1.8,)</jdk>
-      </activation>
-
-      <build>
-        <plugins>
-          <!-- Tells Maven about the Java 8 test source root. -->
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>add-java8-test-source</id>
-                <phase>initialize</phase>
-                <goals>
-                  <goal>add-test-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>${project.basedir}/src/test/java8</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-
-          <!-- Set `-source 1.8 -target 1.8` for Java 8 tests
-               and `-source 1.7 -target 1.7` for Java 7 tests -->
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>default-testCompile</id>
-                <phase>test-compile</phase>
-                <goals>
-                  <goal>testCompile</goal>
-                </goals>
-                <configuration>
-                  <testSource>1.7</testSource>
-                  <testTarget>1.7</testTarget>
-                  <testExcludes>
-                    <!-- This pattern is brittle; we would prefer to filter on 
the directory
-                         but that seems to be unavailable to us. -->
-                    <exclude>**/*Java8Test.java</exclude>
-                  </testExcludes>
-                </configuration>
-              </execution>
-
-              <execution>
-                <id>java8-testCompile</id>
-                <phase>test-compile</phase>
-                <goals>
-                  <goal>testCompile</goal>
-                </goals>
-                <configuration>
-                  <testSource>1.8</testSource>
-                  <testTarget>1.8</testTarget>
-                  <testIncludes>
-                    <include>**/*Java8Test.java</include>
-                  </testIncludes>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
   </profiles>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
deleted file mode 100644
index b569e49..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Java 8 Tests for {@link Combine}.
- */
-@RunWith(JUnit4.class)
-@SuppressWarnings("serial")
-public class CombineJava8Test implements Serializable {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  /**
-   * Class for use in testing use of Java 8 method references.
-   */
-  private static class Summer implements Serializable {
-    public int sum(Iterable<Integer> integers) {
-      int sum = 0;
-      for (int i : integers) {
-        sum += i;
-      }
-      return sum;
-    }
-  }
-
-  /**
-   * Tests creation of a global {@link Combine} via Java 8 lambda.
-   */
-  @Test
-  public void testCombineGloballyLambda() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3, 4))
-        .apply(Combine.globally(integers -> {
-          int sum = 0;
-          for (int i : integers) {
-            sum += i;
-          }
-          return sum;
-        }));
-
-    DataflowAssert.that(output).containsInAnyOrder(10);
-    pipeline.run();
-  }
-
-  /**
-   * Tests creation of a global {@link Combine} via a Java 8 method reference.
-   */
-  @Test
-  public void testCombineGloballyInstanceMethodReference() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3, 4))
-        .apply(Combine.globally(new Summer()::sum));
-
-    DataflowAssert.that(output).containsInAnyOrder(10);
-    pipeline.run();
-  }
-
-  /**
-   * Tests creation of a per-key {@link Combine} via a Java 8 lambda.
-   */
-  @Test
-  public void testCombinePerKeyLambda() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<KV<String, Integer>> output = pipeline
-        .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), 
KV.of("c", 4)))
-        .apply(Combine.perKey(integers -> {
-          int sum = 0;
-          for (int i : integers) {
-            sum += i;
-          }
-          return sum;
-        }));
-
-    DataflowAssert.that(output).containsInAnyOrder(
-        KV.of("a", 4),
-        KV.of("b", 2),
-        KV.of("c", 4));
-    pipeline.run();
-  }
-
-  /**
-   * Tests creation of a per-key {@link Combine} via a Java 8 method reference.
-   */
-  @Test
-  public void testCombinePerKeyInstanceMethodReference() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<KV<String, Integer>> output = pipeline
-        .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), 
KV.of("c", 4)))
-        .apply(Combine.perKey(new Summer()::sum));
-
-    DataflowAssert.that(output).containsInAnyOrder(
-        KV.of("a", 4),
-        KV.of("b", 2),
-        KV.of("c", 4));
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
deleted file mode 100644
index db65932..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Java 8 Tests for {@link Filter}.
- */
-@RunWith(JUnit4.class)
-@SuppressWarnings("serial")
-public class FilterJava8Test implements Serializable {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testIdentityFilterByPredicate() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
-        .apply(Filter.byPredicate(i -> true));
-
-    DataflowAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 
24799, 307);
-    pipeline.run();
-  }
-
-  @Test
-  public void testNoFilterByPredicate() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 4, 5))
-        .apply(Filter.byPredicate(i -> false));
-
-    DataflowAssert.that(output).empty();
-    pipeline.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFilterByPredicate() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.byPredicate(i -> i % 2 == 0));
-
-    DataflowAssert.that(output).containsInAnyOrder(2, 4, 6);
-    pipeline.run();
-  }
-
-  /**
-   * Confirms that in Java 8 style, where a lambda results in a rawtype, the 
output type token is
-   * not useful. If this test ever fails there may be simplifications 
available to us.
-   */
-  @Test
-  public void testFilterParDoOutputTypeDescriptorRaw() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    PCollection<String> output = pipeline
-        .apply(Create.of("hello"))
-        .apply(Filter.by(s -> true));
-
-    thrown.expect(CannotProvideCoderException.class);
-    pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor());
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFilterByMethodReference() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
-        .apply(Filter.byPredicate(new EvenFilter()::isEven));
-
-    DataflowAssert.that(output).containsInAnyOrder(2, 4, 6);
-    pipeline.run();
-  }
-
-  private static class EvenFilter implements Serializable {
-    public boolean isEven(int i) {
-      return i % 2 == 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
deleted file mode 100644
index e0b946b..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Java 8 Tests for {@link FlatMapElements}.
- */
-@RunWith(JUnit4.class)
-public class FlatMapElementsJava8Test implements Serializable {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  /**
-   * Basic test of {@link FlatMapElements} with a lambda (which is 
instantiated as a
-   * {@link SerializableFunction}).
-   */
-  @Test
-  public void testFlatMapBasic() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(FlatMapElements
-            // Note that the input type annotation is required.
-            .via((Integer i) -> ImmutableList.of(i, -i))
-            .withOutputType(new TypeDescriptor<Integer>() {}));
-
-    DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
-    pipeline.run();
-  }
-
-  /**
-   * Basic test of {@link FlatMapElements} with a method reference.
-   */
-  @Test
-  public void testFlatMapMethodReference() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(FlatMapElements
-            // Note that the input type annotation is required.
-            .via(new Negater()::numAndNegation)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
-
-    DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
-    pipeline.run();
-  }
-
-  private static class Negater implements Serializable {
-    public List<Integer> numAndNegation(int input) {
-      return ImmutableList.of(input, -input);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
deleted file mode 100644
index 123e680..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Java 8 tests for {@link MapElements}.
- */
-@RunWith(JUnit4.class)
-public class MapElementsJava8Test implements Serializable {
-
-  /**
-   * Basic test of {@link MapElements} with a lambda (which is instantiated as 
a
-   * {@link SerializableFunction}).
-   */
-  @Test
-  public void testMapBasic() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(MapElements
-            // Note that the type annotation is required (for Java, not for 
Dataflow)
-            .via((Integer i) -> i * 2)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
-
-    DataflowAssert.that(output).containsInAnyOrder(6, 2, 4);
-    pipeline.run();
-  }
-
-  /**
-   * Basic test of {@link MapElements} with a method reference.
-   */
-  @Test
-  public void testMapMethodReference() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(MapElements
-            // Note that the type annotation is required (for Java, not for 
Dataflow)
-            .via(new Doubler()::doubleIt)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
-
-    DataflowAssert.that(output).containsInAnyOrder(6, 2, 4);
-    pipeline.run();
-  }
-
-  private static class Doubler implements Serializable {
-    public int doubleIt(int val) {
-      return val * 2;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
deleted file mode 100644
index c459ada..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Java 8 Tests for {@link Filter}.
- */
-@RunWith(JUnit4.class)
-@SuppressWarnings("serial")
-public class PartitionJava8Test implements Serializable {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testModPartition() {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollectionList<Integer> outputs = pipeline
-        .apply(Create.of(1, 2, 4, 5))
-        .apply(Partition.of(3, (element, numPartitions) -> element % 
numPartitions));
-    assertEquals(3, outputs.size());
-    DataflowAssert.that(outputs.get(0)).empty();
-    DataflowAssert.that(outputs.get(1)).containsInAnyOrder(1, 4);
-    DataflowAssert.that(outputs.get(2)).containsInAnyOrder(2, 5);
-    pipeline.run();
-  }
-
-  /**
-   * Confirms that in Java 8 style, where a lambda results in a rawtype, the 
output type token is
-   * not useful. If this test ever fails there may be simplifications 
available to us.
-   */
-  @Test
-  public void testPartitionFnOutputTypeDescriptorRaw() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    PCollectionList<String> output = pipeline
-        .apply(Create.of("hello"))
-        .apply(Partition.of(1, (element, numPartitions) -> 0));
-
-    thrown.expect(CannotProvideCoderException.class);
-    
pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
deleted file mode 100644
index d9e2180..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Java 8 tests for {@link RemoveDuplicates}.
- */
-@RunWith(JUnit4.class)
-public class RemoveDuplicatesJava8Test {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() 
{
-    TestPipeline p = TestPipeline.create();
-
-    Multimap<Integer, String> predupedContents = HashMultimap.create();
-    predupedContents.put(3, "foo");
-    predupedContents.put(4, "foos");
-    predupedContents.put(6, "barbaz");
-    predupedContents.put(6, "bazbar");
-    PCollection<String> dupes =
-        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
-    PCollection<String> deduped =
-        dupes.apply(RemoveDuplicates.withRepresentativeValueFn((String s) -> 
s.length())
-                                    
.withRepresentativeType(TypeDescriptor.of(Integer.class)));
-
-    DataflowAssert.that(deduped).satisfies((Iterable<String> strs) -> {
-      Set<Integer> seenLengths = new HashSet<>();
-      for (String s : strs) {
-        assertThat(predupedContents.values(), hasItem(s));
-        assertThat(seenLengths, not(contains(s.length())));
-        seenLengths.add(s.length());
-      }
-      return null;
-    });
-
-    p.run();
-  }
-
-  @Test
-  public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
-    TestPipeline p = TestPipeline.create();
-
-    Multimap<Integer, String> predupedContents = HashMultimap.create();
-    predupedContents.put(3, "foo");
-    predupedContents.put(4, "foos");
-    predupedContents.put(6, "barbaz");
-    predupedContents.put(6, "bazbar");
-    PCollection<String> dupes =
-        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Unable to return a default Coder for 
RemoveRepresentativeDupes");
-    thrown.expectMessage("Cannot provide a coder for type variable K");
-    thrown.expectMessage("the actual type is unknown due to erasure.");
-
-    // Thrown when applying a transform to the internal WithKeys that 
withRepresentativeValueFn is
-    // implemented with
-    dupes.apply("RemoveRepresentativeDupes",
-        RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()));
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
deleted file mode 100644
index c10af29..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Java 8 Tests for {@link WithKeys}.
- */
-@RunWith(JUnit4.class)
-public class WithKeysJava8Test {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void withLambdaAndTypeDescriptorShouldSucceed() {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> values = p.apply(Create.of("1234", "3210", "0", 
"-12"));
-    PCollection<KV<Integer, String>> kvs = values.apply(
-        WithKeys.of((String s) -> Integer.valueOf(s))
-                .withKeyType(TypeDescriptor.of(Integer.class)));
-
-    DataflowAssert.that(kvs).containsInAnyOrder(
-        KV.of(1234, "1234"), KV.of(0, "0"), KV.of(-12, "-12"), KV.of(3210, 
"3210"));
-
-    p.run();
-  }
-
-  @Test
-  public void withLambdaAndNoTypeDescriptorShouldThrow() {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> values = p.apply(Create.of("1234", "3210", "0", 
"-12"));
-
-    values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> 
Integer.valueOf(s)));
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectMessage("Unable to return a default Coder for 
ApplyKeysWithWithKeys");
-    thrown.expectMessage("Cannot provide a coder for type variable K");
-    thrown.expectMessage("the actual type is unknown due to erasure.");
-
-    p.run();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
 
b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
deleted file mode 100644
index 50b5ff7..0000000
--- 
a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Java 8 tests for {@link WithTimestamps}.
- */
-@RunWith(JUnit4.class)
-public class WithTimestampsJava8Test implements Serializable {
-  @Test
-  @Category(RunnableOnService.class)
-  public void withTimestampsLambdaShouldApplyTimestamps() {
-    TestPipeline p = TestPipeline.create();
-
-    String yearTwoThousand = "946684800000";
-    PCollection<String> timestamped =
-        p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), 
yearTwoThousand))
-         .apply(WithTimestamps.of((String input) -> new 
Instant(Long.valueOf(yearTwoThousand))));
-
-    PCollection<KV<String, Instant>> timestampedVals =
-        timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
-          @Override
-          public void processElement(DoFn<String, KV<String, 
Instant>>.ProcessContext c)
-              throws Exception {
-            c.output(KV.of(c.element(), c.timestamp()));
-          }
-        }));
-
-    DataflowAssert.that(timestamped)
-        .containsInAnyOrder(yearTwoThousand, "0", "1234", 
Integer.toString(Integer.MAX_VALUE));
-    DataflowAssert.that(timestampedVals)
-        .containsInAnyOrder(
-            KV.of("0", new Instant(0)),
-            KV.of("1234", new Instant("1234")),
-            KV.of(Integer.toString(Integer.MAX_VALUE), new 
Instant(Integer.MAX_VALUE)),
-            KV.of(yearTwoThousand, new 
Instant(Long.valueOf(yearTwoThousand))));
-  }
-}
-

Reply via email to