Repository: incubator-beam
Updated Branches:
  refs/heads/master bba4c64d3 -> b9116ac42


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
deleted file mode 100644
index dfd857e..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import 
org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Tests for {@link ParDoSingleEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class ParDoSingleEvaluatorFactoryTest implements Serializable {
-  private transient BundleFactory bundleFactory = 
InProcessBundleFactory.create();
-
-  @Test
-  public void testParDoInMemoryTransformEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    PCollection<Integer> collection =
-        input.apply(
-            ParDo.of(
-                new DoFn<String, Integer>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.output(c.element().length());
-                  }
-                }));
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = 
bundleFactory.createRootBundle(collection);
-    when(evaluationContext.createBundle(inputBundle, 
collection)).thenReturn(outputBundle);
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, null, null, null);
-    
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
 null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator 
=
-        new ParDoSingleEvaluatorFactory()
-            .forApplication(
-                collection.getProducingTransformInternal(), inputBundle, 
evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(result.getOutputBundles(), 
Matchers.<UncommittedBundle<?>>contains(outputBundle));
-    assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
-
-    assertThat(
-        outputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(3),
-            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(5, 
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
-    PCollection<Integer> collection =
-        input.apply(
-            ParDo.of(
-                new DoFn<String, Integer>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.sideOutput(sideOutputTag, c.element().length());
-                  }
-                }));
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = 
bundleFactory.createRootBundle(collection);
-    when(evaluationContext.createBundle(inputBundle, 
collection)).thenReturn(outputBundle);
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, null, null, null);
-    
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
 null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory()
-            .forApplication(
-                collection.getProducingTransformInternal(), inputBundle, 
evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(), 
Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
-    assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
-  }
-
-  @Test
-  public void finishBundleWithStatePutsStateInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
-        StateTags.watermarkStateInternal("myId", 
OutputTimeFns.outputAtEarliestInputTimestamp());
-    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", 
StringUtf8Coder.of());
-    final StateNamespace windowNs =
-        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE);
-    ParDo.Bound<String, KV<String, Integer>> pardo =
-        ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.windowingInternals()
-                    .stateInternals()
-                    .state(StateNamespaces.global(), watermarkTag)
-                    .add(new Instant(124443L - c.element().length()));
-                c.windowingInternals()
-                    .stateInternals()
-                    .state(
-                        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE),
-                        bagTag)
-                    .add(c.element());
-              }
-            });
-    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-
-    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
-
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator 
=
-        new ParDoSingleEvaluatorFactory()
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
-    assertThat(result.getState(), not(nullValue()));
-    assertThat(
-        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
-        equalTo(new Instant(124438L)));
-    assertThat(
-        result.getState().state(windowNs, bagTag).read(),
-        containsInAnyOrder("foo", "bara", "bazam"));
-  }
-
-  @Test
-  public void finishBundleWithStateAndTimersPutsTimersInResult() throws 
Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    final TimerData addedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(
-                    new Instant(0).plus(Duration.standardMinutes(5)),
-                    new Instant(1)
-                        .plus(Duration.standardMinutes(5))
-                        .plus(Duration.standardHours(1)))),
-            new Instant(54541L),
-            TimeDomain.EVENT_TIME);
-    final TimerData deletedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardHours(1)))),
-            new Instant(3400000),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    ParDo.Bound<String, KV<String, Integer>> pardo =
-        ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.windowingInternals().stateInternals();
-                c.windowingInternals()
-                    .timerInternals()
-                    .setTimer(
-                        TimerData.of(
-                            StateNamespaces.window(
-                                IntervalWindow.getCoder(),
-                                new IntervalWindow(
-                                    new 
Instant(0).plus(Duration.standardMinutes(5)),
-                                    new Instant(1)
-                                        .plus(Duration.standardMinutes(5))
-                                        .plus(Duration.standardHours(1)))),
-                            new Instant(54541L),
-                            TimeDomain.EVENT_TIME));
-                c.windowingInternals()
-                    .timerInternals()
-                    .deleteTimer(
-                        TimerData.of(
-                            StateNamespaces.window(
-                                IntervalWindow.getCoder(),
-                                new IntervalWindow(
-                                    new Instant(0),
-                                    new 
Instant(0).plus(Duration.standardHours(1)))),
-                            new Instant(3400000),
-                            TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
-              }
-            });
-    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-
-    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
-
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory()
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getTimerUpdate(),
-        equalTo(
-            
TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java
deleted file mode 100644
index 239ce27..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIOTest;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link TextIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class TextIOShardedWriteFactoryTest {
-  @Rule public TemporaryFolder tmp = new TemporaryFolder();
-  private TextIOShardedWriteFactory factory;
-
-  @Before
-  public void setup() {
-    factory = new TextIOShardedWriteFactory();
-  }
-
-  @Test
-  public void originalWithoutShardingReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original =
-        TextIO.Write.to(file.getAbsolutePath()).withoutSharding();
-    PTransform<PCollection<String>, PDone> overridden = 
factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original = 
TextIO.Write.to(file.getAbsolutePath());
-    PTransform<PCollection<String>, PDone> overridden = 
factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    TextIO.Write.Bound<String> original =
-        TextIO.Write.to(file.getAbsolutePath()).withNumShards(1);
-    PTransform<PCollection<String>, PDone> overridden = 
factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, 
PDone>>equalTo(original)));
-
-    TestPipeline p = TestPipeline.create();
-    String[] elems = new String[] {"foo", "bar", "baz"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-
-    p.run();
-    TextIOTest.assertOutputFiles(
-        elems, StringUtf8Coder.of(), 1, tmp, "foo", 
original.getShardNameTemplate());
-  }
-
-  @Test
-  public void originalShardedToManyReturnsExplicitlySharded() throws Exception 
{
-    File file = tmp.newFile("foo");
-    TextIO.Write.Bound<String> original = 
TextIO.Write.to(file.getAbsolutePath()).withNumShards(3);
-    PTransform<PCollection<String>, PDone> overridden = 
factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, 
PDone>>equalTo(original)));
-
-    TestPipeline p = TestPipeline.create();
-    String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-    p.run();
-    TextIOTest.assertOutputFiles(
-        elems, StringUtf8Coder.of(), 3, tmp, "foo", 
original.getShardNameTemplate());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java
deleted file mode 100644
index 33dbbdc..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.any;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Tests for {@link TransformExecutorServices}.
- */
-@RunWith(JUnit4.class)
-public class TransformExecutorServicesTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private ExecutorService executorService;
-  private Map<TransformExecutor<?>, Boolean> scheduled;
-
-  @Before
-  public void setup() {
-    executorService = MoreExecutors.newDirectExecutorService();
-    scheduled = new ConcurrentHashMap<>();
-  }
-
-  @Test
-  public void parallelScheduleMultipleSchedulesBothImmediately() {
-    @SuppressWarnings("unchecked")
-    TransformExecutor<Object> first = mock(TransformExecutor.class);
-    @SuppressWarnings("unchecked")
-    TransformExecutor<Object> second = mock(TransformExecutor.class);
-
-    TransformExecutorService parallel =
-        TransformExecutorServices.parallel(executorService, scheduled);
-    parallel.schedule(first);
-    parallel.schedule(second);
-
-    verify(first).call();
-    verify(second).call();
-    assertThat(
-        scheduled,
-        Matchers.allOf(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true),
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)));
-
-    parallel.complete(first);
-    assertThat(scheduled, Matchers.<TransformExecutor<?>, 
Boolean>hasEntry(second, true));
-    assertThat(
-        scheduled,
-        not(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
-                Matchers.<TransformExecutor<?>>equalTo(first), 
any(Boolean.class))));
-    parallel.complete(second);
-    assertThat(scheduled.isEmpty(), is(true));
-  }
-
-  @Test
-  public void serialScheduleTwoWaitsForFirstToComplete() {
-    @SuppressWarnings("unchecked")
-    TransformExecutor<Object> first = mock(TransformExecutor.class);
-    @SuppressWarnings("unchecked")
-    TransformExecutor<Object> second = mock(TransformExecutor.class);
-
-    TransformExecutorService serial = 
TransformExecutorServices.serial(executorService, scheduled);
-    serial.schedule(first);
-    verify(first).call();
-
-    serial.schedule(second);
-    verify(second, never()).call();
-
-    assertThat(scheduled, Matchers.<TransformExecutor<?>, 
Boolean>hasEntry(first, true));
-    assertThat(
-        scheduled,
-        not(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
-                Matchers.<TransformExecutor<?>>equalTo(second), 
any(Boolean.class))));
-
-    serial.complete(first);
-    verify(second).call();
-    assertThat(scheduled, Matchers.<TransformExecutor<?>, 
Boolean>hasEntry(second, true));
-    assertThat(
-        scheduled,
-        not(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
-                Matchers.<TransformExecutor<?>>equalTo(first), 
any(Boolean.class))));
-
-    serial.complete(second);
-  }
-
-  @Test
-  public void serialCompleteNotExecutingTaskThrows() {
-    @SuppressWarnings("unchecked")
-    TransformExecutor<Object> first = mock(TransformExecutor.class);
-    @SuppressWarnings("unchecked")
-    TransformExecutor<Object> second = mock(TransformExecutor.class);
-
-    TransformExecutorService serial = 
TransformExecutorServices.serial(executorService, scheduled);
-    serial.schedule(first);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("unexpected currently executing");
-
-    serial.complete(second);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
deleted file mode 100644
index 31cb29a..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Tests for {@link TransformExecutor}.
- */
-@RunWith(JUnit4.class)
-public class TransformExecutorTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private PCollection<String> created;
-  private PCollection<KV<Integer, String>> downstream;
-
-  private CountDownLatch evaluatorCompleted;
-
-  private RegisteringCompletionCallback completionCallback;
-  private TransformExecutorService transformEvaluationState;
-  private BundleFactory bundleFactory;
-  @Mock private InProcessEvaluationContext evaluationContext;
-  @Mock private TransformEvaluatorRegistry registry;
-  private Map<TransformExecutor<?>, Boolean> scheduled;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-
-    bundleFactory = InProcessBundleFactory.create();
-
-    scheduled = new HashMap<>();
-    transformEvaluationState =
-        
TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), 
scheduled);
-
-    evaluatorCompleted = new CountDownLatch(1);
-    completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
-
-    TestPipeline p = TestPipeline.create();
-    created = p.apply(Create.of("foo", "spam", "third"));
-    downstream = created.apply(WithKeys.<Integer, String>of(3));
-  }
-
-  @Test
-  public void callWithNullInputBundleFinishesBundleAndCompletes() throws 
Exception {
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
-    final AtomicBoolean finishCalled = new AtomicBoolean(false);
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws 
Exception {
-            throw new IllegalArgumentException("Shouldn't be called");
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            finishCalled.set(true);
-            return result;
-          }
-        };
-
-    when(registry.forApplication(created.getProducingTransformInternal(), 
null, evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<Object> executor =
-        TransformExecutor.create(
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
-            null,
-            created.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-    executor.call();
-
-    assertThat(finishCalled.get(), is(true));
-    assertThat(completionCallback.handledResult, equalTo(result));
-    assertThat(completionCallback.handledThrowable, is(nullValue()));
-    assertThat(scheduled, 
not(Matchers.<TransformExecutor<?>>hasKey(executor)));
-  }
-
-  @Test
-  public void inputBundleProcessesEachElementFinishesAndCompletes() throws 
Exception {
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-    final Collection<WindowedValue<String>> elementsProcessed = new 
ArrayList<>();
-    TransformEvaluator<String> evaluator =
-        new TransformEvaluator<String>() {
-          @Override
-          public void processElement(WindowedValue<String> element) throws 
Exception {
-            elementsProcessed.add(element);
-            return;
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            return result;
-          }
-        };
-
-    WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
-    WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
-    WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
-    CommittedBundle<String> inputBundle =
-        
bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
-    when(
-            registry.<String>forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
-            inputBundle,
-            downstream.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    Executors.newSingleThreadExecutor().submit(executor);
-
-    evaluatorCompleted.await();
-
-    assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
-    assertThat(completionCallback.handledResult, equalTo(result));
-    assertThat(completionCallback.handledThrowable, is(nullValue()));
-    assertThat(scheduled, 
not(Matchers.<TransformExecutor<?>>hasKey(executor)));
-  }
-
-  @Test
-  public void processElementThrowsExceptionCallsback() throws Exception {
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-    final Exception exception = new Exception();
-    TransformEvaluator<String> evaluator =
-        new TransformEvaluator<String>() {
-          @Override
-          public void processElement(WindowedValue<String> element) throws 
Exception {
-            throw exception;
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            return result;
-          }
-        };
-
-    WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(created).add(foo).commit(Instant.now());
-    when(
-            registry.<String>forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
-            inputBundle,
-            downstream.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-    Executors.newSingleThreadExecutor().submit(executor);
-
-    evaluatorCompleted.await();
-
-    assertThat(completionCallback.handledResult, is(nullValue()));
-    assertThat(completionCallback.handledThrowable, 
Matchers.<Throwable>equalTo(exception));
-    assertThat(scheduled, 
not(Matchers.<TransformExecutor<?>>hasKey(executor)));
-  }
-
-  @Test
-  public void finishBundleThrowsExceptionCallsback() throws Exception {
-    final Exception exception = new Exception();
-    TransformEvaluator<String> evaluator =
-        new TransformEvaluator<String>() {
-          @Override
-          public void processElement(WindowedValue<String> element) throws 
Exception {}
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            throw exception;
-          }
-        };
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(created).commit(Instant.now());
-    when(
-            registry.<String>forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
-            inputBundle,
-            downstream.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-    Executors.newSingleThreadExecutor().submit(executor);
-
-    evaluatorCompleted.await();
-
-    assertThat(completionCallback.handledResult, is(nullValue()));
-    assertThat(completionCallback.handledThrowable, 
Matchers.<Throwable>equalTo(exception));
-    assertThat(scheduled, 
not(Matchers.<TransformExecutor<?>>hasKey(executor)));
-  }
-
-  @Test
-  public void duringCallGetThreadIsNonNull() throws Exception {
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws 
Exception {
-            throw new IllegalArgumentException("Shouldn't be called");
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
-            return result;
-          }
-        };
-
-    when(registry.forApplication(created.getProducingTransformInternal(), 
null, evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
-            null,
-            created.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    assertThat(executor.getThread(), not(nullValue()));
-
-    // Finish the execution so everything can get closed down cleanly.
-    evaluatorLatch.countDown();
-  }
-
-  @Test
-  public void callWithEnforcementAppliesEnforcement() throws Exception {
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws 
Exception {
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            return result;
-          }
-        };
-
-    WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo");
-    WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
-    CommittedBundle<String> inputBundle =
-        
bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now());
-    when(
-            registry.forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
-        .thenReturn(evaluator);
-
-    TestEnforcementFactory enforcement = new TestEnforcementFactory();
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            registry,
-            Collections.<ModelEnforcementFactory>singleton(enforcement),
-            evaluationContext,
-            inputBundle,
-            downstream.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    executor.call();
-    TestEnforcement<?> testEnforcement = enforcement.instance;
-    assertThat(
-        testEnforcement.beforeElements,
-        Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
-    assertThat(
-        testEnforcement.afterElements,
-        Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
-    assertThat(testEnforcement.finishedBundles, contains(result));
-  }
-
-  @Test
-  public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
-    PCollection<byte[]> pcBytes =
-        created.apply(
-            new PTransform<PCollection<String>, PCollection<byte[]>>() {
-              @Override
-              public PCollection<byte[]> apply(PCollection<String> input) {
-                return PCollection.<byte[]>createPrimitiveOutputInternal(
-                        input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded())
-                    .setCoder(ByteArrayCoder.of());
-              }
-            });
-
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
-
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws 
Exception {}
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
-            return result;
-          }
-        };
-
-    WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
-    CommittedBundle<byte[]> inputBundle =
-        
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
-    when(
-            registry.forApplication(
-                pcBytes.getProducingTransformInternal(), inputBundle, 
evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<byte[]> executor =
-        TransformExecutor.create(
-            registry,
-            
Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
-            evaluationContext,
-            inputBundle,
-            pcBytes.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    Future<InProcessTransformResult> task = 
Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    fooBytes.getValue()[0] = 'b';
-    evaluatorLatch.countDown();
-
-    thrown.expectCause(isA(IllegalMutationException.class));
-    task.get();
-  }
-
-  @Test
-  public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
-    PCollection<byte[]> pcBytes =
-        created.apply(
-            new PTransform<PCollection<String>, PCollection<byte[]>>() {
-              @Override
-              public PCollection<byte[]> apply(PCollection<String> input) {
-                return PCollection.<byte[]>createPrimitiveOutputInternal(
-                        input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded())
-                    .setCoder(ByteArrayCoder.of());
-              }
-            });
-
-    final InProcessTransformResult result =
-        
StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
-
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws 
Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() throws Exception {
-            return result;
-          }
-        };
-
-    WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
-    CommittedBundle<byte[]> inputBundle =
-        
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
-    when(
-            registry.forApplication(
-                pcBytes.getProducingTransformInternal(), inputBundle, 
evaluationContext))
-        .thenReturn(evaluator);
-
-    TransformExecutor<byte[]> executor =
-        TransformExecutor.create(
-            registry,
-            
Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
-            evaluationContext,
-            inputBundle,
-            pcBytes.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    Future<InProcessTransformResult> task = 
Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    fooBytes.getValue()[0] = 'b';
-    evaluatorLatch.countDown();
-
-    thrown.expectCause(isA(IllegalMutationException.class));
-    task.get();
-  }
-
-  private static class RegisteringCompletionCallback implements 
CompletionCallback {
-    private InProcessTransformResult handledResult = null;
-    private Throwable handledThrowable = null;
-    private final CountDownLatch onMethod;
-
-    private RegisteringCompletionCallback(CountDownLatch onMethod) {
-      this.onMethod = onMethod;
-    }
-
-    @Override
-    public CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      handledResult = result;
-      onMethod.countDown();
-      return CommittedResult.create(result, 
Collections.<CommittedBundle<?>>emptyList());
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      handledThrowable = t;
-      onMethod.countDown();
-    }
-  }
-
-  private static class TestEnforcementFactory implements 
ModelEnforcementFactory {
-    private TestEnforcement<?> instance;
-    @Override
-    public <T> TestEnforcement<T> forBundle(
-        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
-      TestEnforcement<T> newEnforcement = new TestEnforcement<>();
-      instance = newEnforcement;
-      return newEnforcement;
-    }
-  }
-
-  private static class TestEnforcement<T> implements ModelEnforcement<T> {
-    private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
-    private final List<WindowedValue<T>> afterElements = new ArrayList<>();
-    private final List<InProcessTransformResult> finishedBundles = new 
ArrayList<>();
-
-    @Override
-    public void beforeElement(WindowedValue<T> element) {
-      beforeElements.add(element);
-    }
-
-    @Override
-    public void afterElement(WindowedValue<T> element) {
-      afterElements.add(element);
-    }
-
-    @Override
-    public void afterFinish(
-        CommittedBundle<T> input,
-        InProcessTransformResult result,
-        Iterable<? extends CommittedBundle<?>> outputs) {
-      finishedBundles.add(result);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
deleted file mode 100644
index 82657c0..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
-import org.apache.beam.sdk.options.PipelineOptions;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.joda.time.DateTime;
-import org.joda.time.Instant;
-import org.joda.time.ReadableInstant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-/**
- * Tests for {@link UnboundedReadEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class UnboundedReadEvaluatorFactoryTest {
-  private PCollection<Long> longs;
-  private TransformEvaluatorFactory factory;
-  private InProcessEvaluationContext context;
-  private UncommittedBundle<Long> output;
-
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
-
-  @Before
-  public void setup() {
-    UnboundedSource<Long, ?> source =
-        CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
-    TestPipeline p = TestPipeline.create();
-    longs = p.apply(Read.from(source));
-
-    factory = new UnboundedReadEvaluatorFactory();
-    context = mock(InProcessEvaluationContext.class);
-    output = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
-  }
-
-  @Test
-  public void unboundedSourceInMemoryTransformEvaluatorProducesElements() 
throws Exception {
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getWatermarkHold(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
-            tgw(0L)));
-  }
-
-  /**
-   * Demonstrate that multiple sequential creations will produce additional 
elements if the source
-   * can provide them.
-   */
-  @Test
-  public void 
unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws 
Exception {
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getWatermarkHold(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
-            tgw(0L)));
-
-    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(secondOutput);
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
-    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
-    assertThat(
-        secondResult.getWatermarkHold(),
-        Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        secondOutput.commit(Instant.now()).getElements(),
-        containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), 
tgw(17L), tgw(16L),
-            tgw(15L), tgw(13L), tgw(10L)));
-  }
-
-  @Test
-  public void boundedSourceEvaluatorClosesReader() throws Exception {
-    TestUnboundedSource<Long> source =
-        new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
-
-    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
-    evaluator.finishBundle();
-    CommittedBundle<Long> committed = output.commit(Instant.now());
-    assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
-  }
-
-  @Test
-  public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
-    TestUnboundedSource<Long> source = new 
TestUnboundedSource<>(BigEndianLongCoder.of());
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
-
-    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
-    evaluator.finishBundle();
-    CommittedBundle<Long> committed = output.commit(Instant.now());
-    assertThat(committed.getElements(), emptyIterable());
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
-  }
-
-  // TODO: Once the source is split into multiple sources before evaluating, 
this test will have to
-  // be updated.
-  /**
-   * Demonstrate that only a single unfinished instance of TransformEvaluator 
can be created at a
-   * time, with other calls returning an empty evaluator.
-   */
-  @Test
-  public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() 
throws Exception {
-    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
-
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
-
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
-
-    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
-    InProcessTransformResult result = evaluator.finishBundle();
-
-    assertThat(
-        result.getWatermarkHold(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
-            tgw(0L)));
-
-    assertThat(secondResult.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-    assertThat(secondOutput.commit(Instant.now()).getElements(), 
emptyIterable());
-  }
-
-  /**
-   * A terse alias for producing timestamped longs in the {@link 
GlobalWindow}, where
-   * the timestamp is the epoch offset by the value of the element.
-   */
-  private static WindowedValue<Long> tgw(Long elem) {
-    return WindowedValue.timestampedValueInGlobalWindow(elem, new 
Instant(elem));
-  }
-
-  private static class LongToInstantFn implements SerializableFunction<Long, 
Instant> {
-    @Override
-    public Instant apply(Long input) {
-      return new Instant(input);
-    }
-  }
-
-  private static class TestUnboundedSource<T> extends UnboundedSource<T, 
TestCheckpointMark> {
-    static int readerClosedCount;
-    private final Coder<T> coder;
-    private final List<T> elems;
-
-    public TestUnboundedSource(Coder<T> coder, T... elems) {
-      readerClosedCount = 0;
-      this.coder = coder;
-      this.elems = Arrays.asList(elems);
-    }
-
-    @Override
-    public List<? extends UnboundedSource<T, TestCheckpointMark>> 
generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      return ImmutableList.of(this);
-    }
-
-    @Override
-    public UnboundedSource.UnboundedReader<T> createReader(
-        PipelineOptions options, TestCheckpointMark checkpointMark) {
-      return new TestUnboundedReader(elems);
-    }
-
-    @Override
-    @Nullable
-    public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
-      return new TestCheckpointMark.Coder();
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    private class TestUnboundedReader extends UnboundedReader<T> {
-      private final List<T> elems;
-      private int index;
-
-      public TestUnboundedReader(List<T> elems) {
-        this.elems = elems;
-        this.index = -1;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return advance();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        if (index + 1 < elems.size()) {
-          index++;
-          return true;
-        }
-        return false;
-      }
-
-      @Override
-      public Instant getWatermark() {
-        return Instant.now();
-      }
-
-      @Override
-      public CheckpointMark getCheckpointMark() {
-        return new TestCheckpointMark();
-      }
-
-      @Override
-      public UnboundedSource<T, ?> getCurrentSource() {
-        TestUnboundedSource<T> source = TestUnboundedSource.this;
-        return source;
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        return elems.get(index);
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return Instant.now();
-      }
-
-      @Override
-      public void close() throws IOException {
-        readerClosedCount++;
-      }
-    }
-  }
-
-  private static class TestCheckpointMark implements CheckpointMark {
-    @Override
-    public void finalizeCheckpoint() throws IOException {}
-
-    public static class Coder extends AtomicCoder<TestCheckpointMark> {
-      @Override
-      public void encode(
-          TestCheckpointMark value,
-          OutputStream outStream,
-          org.apache.beam.sdk.coders.Coder.Context context)
-          throws CoderException, IOException {}
-
-      @Override
-      public TestCheckpointMark decode(
-          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context 
context)
-          throws CoderException, IOException {
-        return new TestCheckpointMark();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
deleted file mode 100644
index 05346dc..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ViewEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class ViewEvaluatorFactoryTest {
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
-
-  @Test
-  public void testInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bar"));
-    CreatePCollectionView<String, Iterable<String>> createView =
-        CreatePCollectionView.of(
-            PCollectionViews.iterableView(p, input.getWindowingStrategy(), 
StringUtf8Coder.of()));
-    PCollection<Iterable<String>> concat =
-        input.apply(WithKeys.<Void, String>of((Void) null))
-            .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
-            .apply(GroupByKey.<Void, String>create())
-            .apply(Values.<Iterable<String>>create());
-    PCollectionView<Iterable<String>> view =
-        concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
-
-    InProcessEvaluationContext context = 
mock(InProcessEvaluationContext.class);
-    TestViewWriter<String, Iterable<String>> viewWriter = new 
TestViewWriter<>();
-    when(context.createPCollectionViewWriter(concat, 
view)).thenReturn(viewWriter);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-    TransformEvaluator<Iterable<String>> evaluator =
-        new ViewEvaluatorFactory()
-            .forApplication(view.getProducingTransformInternal(), inputBundle, 
context);
-
-    evaluator.processElement(
-        
WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", 
"bar")));
-    assertThat(viewWriter.latest, nullValue());
-
-    evaluator.finishBundle();
-    assertThat(
-        viewWriter.latest,
-        containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"), 
WindowedValue.valueInGlobalWindow("bar")));
-  }
-
-  private static class TestViewWriter<ElemT, ViewT> implements 
PCollectionViewWriter<ElemT, ViewT> {
-    private Iterable<WindowedValue<ElemT>> latest;
-
-    @Override
-    public void add(Iterable<WindowedValue<ElemT>> values) {
-      latest = values;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
deleted file mode 100644
index 3b36bc5..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for {@link WatermarkCallbackExecutor}.
- */
-@RunWith(JUnit4.class)
-public class WatermarkCallbackExecutorTest {
-  private WatermarkCallbackExecutor executor = 
WatermarkCallbackExecutor.create();
-  private AppliedPTransform<?, ?, ?> create;
-  private AppliedPTransform<?, ?, ?> sum;
-
-  @Before
-  public void setup() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    create = created.getProducingTransformInternal();
-    sum = 
created.apply(Sum.integersGlobally()).getProducingTransformInternal();
-  }
-
-  @Test
-  public void onGuaranteedFiringFiresAfterTrigger() throws Exception {
-    CountDownLatch latch = new CountDownLatch(1);
-    executor.callOnGuaranteedFiring(
-        create,
-        GlobalWindow.INSTANCE,
-        WindowingStrategy.globalDefault(),
-        new CountDownLatchCallback(latch));
-
-    executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
-  }
-
-  @Test
-  public void multipleCallbacksShouldFireFires() throws Exception {
-    CountDownLatch latch = new CountDownLatch(2);
-    WindowFn<Object, IntervalWindow> windowFn = 
FixedWindows.of(Duration.standardMinutes(10));
-    IntervalWindow window =
-        new IntervalWindow(new Instant(0L), new 
Instant(0L).plus(Duration.standardMinutes(10)));
-    executor.callOnGuaranteedFiring(
-        create, window, WindowingStrategy.of(windowFn), new 
CountDownLatchCallback(latch));
-    executor.callOnGuaranteedFiring(
-        create, window, WindowingStrategy.of(windowFn), new 
CountDownLatchCallback(latch));
-
-    executor.fireForWatermark(create, new 
Instant(0L).plus(Duration.standardMinutes(10)));
-    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
-  }
-
-  @Test
-  public void noCallbacksShouldFire() throws Exception {
-    CountDownLatch latch = new CountDownLatch(1);
-    WindowFn<Object, IntervalWindow> windowFn = 
FixedWindows.of(Duration.standardMinutes(10));
-    IntervalWindow window =
-        new IntervalWindow(new Instant(0L), new 
Instant(0L).plus(Duration.standardMinutes(10)));
-    executor.callOnGuaranteedFiring(
-        create, window, WindowingStrategy.of(windowFn), new 
CountDownLatchCallback(latch));
-
-    executor.fireForWatermark(create, new 
Instant(0L).plus(Duration.standardMinutes(5)));
-    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
-  }
-
-  @Test
-  public void unrelatedStepShouldNotFire() throws Exception {
-    CountDownLatch latch = new CountDownLatch(1);
-    WindowFn<Object, IntervalWindow> windowFn = 
FixedWindows.of(Duration.standardMinutes(10));
-    IntervalWindow window =
-        new IntervalWindow(new Instant(0L), new 
Instant(0L).plus(Duration.standardMinutes(10)));
-    executor.callOnGuaranteedFiring(
-        sum, window, WindowingStrategy.of(windowFn), new 
CountDownLatchCallback(latch));
-
-    executor.fireForWatermark(create, new 
Instant(0L).plus(Duration.standardMinutes(20)));
-    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
-  }
-
-  private static class CountDownLatchCallback implements Runnable {
-    private final CountDownLatch latch;
-
-    public CountDownLatchCallback(CountDownLatch latch) {
-      this.latch = latch;
-    }
-
-    @Override
-    public void run() {
-      latch.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
deleted file mode 100644
index d41825d..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link WindowEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class WindowEvaluatorFactoryTest {
-  private static final Instant EPOCH = new Instant(0);
-
-  private PCollection<Long> input;
-  private WindowEvaluatorFactory factory;
-
-  @Mock private InProcessEvaluationContext evaluationContext;
-
-  private BundleFactory bundleFactory;
-
-  private WindowedValue<Long> first =
-      WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
-  private WindowedValue<Long> second =
-      WindowedValue.timestampedValueInGlobalWindow(
-          Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3)));
-  private WindowedValue<Long> third =
-      WindowedValue.of(
-          Long.valueOf(2L),
-          new Instant(-10L),
-          new IntervalWindow(new Instant(-100), EPOCH),
-          PaneInfo.NO_FIRING);
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    TestPipeline p = TestPipeline.create();
-    input = p.apply(Create.of(1L, 2L, 3L));
-
-    bundleFactory = InProcessBundleFactory.create();
-    factory = new WindowEvaluatorFactory();
-  }
-
-  @Test
-  public void nullWindowFunSucceeds() throws Exception {
-    Bound<Long> transform =
-        Window.<Long>triggering(
-                
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))
-            .accumulatingFiredPanes();
-    PCollection<Long> triggering = input.apply(transform);
-
-    CommittedBundle<Long> inputBundle = createInputBundle();
-
-    UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, 
inputBundle);
-
-    InProcessTransformResult result = runEvaluator(triggering, inputBundle, 
transform);
-
-    assertThat(
-        Iterables.getOnlyElement(result.getOutputBundles()),
-        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
-    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(third, first, 
second));
-  }
-
-  @Test
-  public void singleWindowFnSucceeds() throws Exception {
-    Duration windowDuration = Duration.standardDays(7);
-    Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
-    PCollection<Long> windowed = input.apply(transform);
-
-    CommittedBundle<Long> inputBundle = createInputBundle();
-
-    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, 
inputBundle);
-
-    BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, 
EPOCH.plus(windowDuration));
-    BoundedWindow thirdWindow = new 
IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
-
-    InProcessTransformResult result = runEvaluator(windowed, inputBundle, 
transform);
-
-    assertThat(
-        Iterables.getOnlyElement(result.getOutputBundles()),
-        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
-    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-
-    WindowedValue<Long> expectedNewFirst =
-        WindowedValue.of(3L, new Instant(2L), firstSecondWindow, 
PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedNewSecond =
-        WindowedValue.of(
-            1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedNewThird =
-        WindowedValue.of(2L, new Instant(-10L), thirdWindow, 
PaneInfo.NO_FIRING);
-    assertThat(
-        committed.getElements(),
-        containsInAnyOrder(expectedNewFirst, expectedNewSecond, 
expectedNewThird));
-  }
-
-  @Test
-  public void multipleWindowsWindowFnSucceeds() throws Exception {
-    Duration windowDuration = Duration.standardDays(6);
-    Duration slidingBy = Duration.standardDays(3);
-    Bound<Long> transform = 
Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
-    PCollection<Long> windowed = input.apply(transform);
-
-    CommittedBundle<Long> inputBundle = createInputBundle();
-    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, 
inputBundle);
-
-    InProcessTransformResult result = runEvaluator(windowed, inputBundle, 
transform);
-
-    assertThat(
-        Iterables.getOnlyElement(result.getOutputBundles()),
-        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
-    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-
-    BoundedWindow w1 = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
-    BoundedWindow w2 =
-        new IntervalWindow(EPOCH.plus(slidingBy), 
EPOCH.plus(slidingBy).plus(windowDuration));
-    BoundedWindow wMinus1 = new IntervalWindow(EPOCH.minus(windowDuration), 
EPOCH);
-    BoundedWindow wMinusSlide =
-        new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), 
EPOCH.plus(slidingBy));
-
-    WindowedValue<Long> expectedFirst =
-        WindowedValue.of(
-            first.getValue(),
-            first.getTimestamp(),
-            ImmutableSet.of(w1, wMinusSlide),
-            PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedSecond =
-        WindowedValue.of(
-            second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), 
PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedThird =
-        WindowedValue.of(
-            third.getValue(),
-            third.getTimestamp(),
-            ImmutableSet.of(wMinus1, wMinusSlide),
-            PaneInfo.NO_FIRING);
-
-    assertThat(
-        committed.getElements(), containsInAnyOrder(expectedFirst, 
expectedSecond, expectedThird));
-  }
-
-  private CommittedBundle<Long> createInputBundle() {
-    CommittedBundle<Long> inputBundle =
-        bundleFactory
-            .createRootBundle(input)
-            .add(first)
-            .add(second)
-            .add(third)
-            .commit(Instant.now());
-    return inputBundle;
-  }
-
-  private UncommittedBundle<Long> createOutputBundle(
-      PCollection<Long> output, CommittedBundle<Long> inputBundle) {
-    UncommittedBundle<Long> outputBundle = 
bundleFactory.createBundle(inputBundle, output);
-    when(evaluationContext.createBundle(inputBundle, 
output)).thenReturn(outputBundle);
-    return outputBundle;
-  }
-
-  private InProcessTransformResult runEvaluator(
-      PCollection<Long> windowed,
-      CommittedBundle<Long> inputBundle,
-      Window.Bound<Long> windowTransform /* Required while Window.Bound is a 
composite */)
-      throws Exception {
-    TransformEvaluator<Long> evaluator =
-        factory.forApplication(
-            AppliedPTransform.of("Window", input, windowed, windowTransform),
-            inputBundle,
-            evaluationContext);
-
-    evaluator.processElement(first);
-    evaluator.processElement(second);
-    evaluator.processElement(third);
-    InProcessTransformResult result = evaluator.finishBundle();
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index a8972c2..54c841e 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -56,5 +56,4 @@
       </modules>
     </profile>
   </profiles>
-
 </project>

Reply via email to