[ 
https://issues.apache.org/jira/browse/BEAM-4269?focusedWorklogId=101907&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101907
 ]

ASF GitHub Bot logged work on BEAM-4269:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/May/18 22:06
            Start Date: 14/May/18 22:06
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5329: [BEAM-4269, 
BEAM-3970] Implement AssignWindows in the Java SDK harness
URL: https://github.com/apache/beam/pull/5329
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java
new file mode 100644
index 00000000000..84a2771d3b0
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.fn.harness.MapFnRunners.WindowedValueMapFnFactory;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/** The Java SDK Harness implementation of the {@link Window.Assign} 
primitive. */
+class AssignWindowsRunner<T, W extends BoundedWindow> {
+
+  /** A registrar which provides a factory to handle Java {@link WindowFn 
WindowFns}. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements PTransformRunnerFactory.Registrar {
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() 
{
+      return ImmutableMap.of(
+          PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
+          MapFnRunners.forWindowedValueMapFnFactory(new 
AssignWindowsMapFnFactory<>()));
+    }
+  }
+
+  @VisibleForTesting
+  static class AssignWindowsMapFnFactory<T> implements 
WindowedValueMapFnFactory<T, T> {
+    @Override
+    public ThrowingFunction<WindowedValue<T>, WindowedValue<T>> forPTransform(
+        String ptransformId, PTransform ptransform) throws IOException {
+      checkArgument(
+          
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN.equals(ptransform.getSpec().getUrn()));
+      checkArgument(ptransform.getInputsCount() == 1, "Expected only one 
input");
+      checkArgument(ptransform.getOutputsCount() == 1, "Expected only one 
output");
+      WindowIntoPayload payload = 
WindowIntoPayload.parseFrom(ptransform.getSpec().getPayload());
+
+      WindowFn<T, ?> windowFn =
+          (WindowFn<T, ?>) 
WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
+
+      return AssignWindowsRunner.create(windowFn)::assignWindows;
+    }
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////////
+
+  static <T, W extends BoundedWindow> AssignWindowsRunner<T, W> create(
+      WindowFn<? super T, W> windowFn) {
+    // Safe contravariant cast
+    WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) windowFn;
+    return new AssignWindowsRunner<>(typedWindowFn);
+  }
+
+  private final WindowFn<T, W> windowFn;
+
+  private AssignWindowsRunner(WindowFn<T, W> windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  WindowedValue<T> assignWindows(WindowedValue<T> input) throws Exception {
+    // TODO: BEAM-4272 consider allocating only once and updating the current 
value per call.
+    WindowFn<T, W>.AssignContext ctxt =
+        windowFn.new AssignContext() {
+          @Override
+          public T element() {
+            return input.getValue();
+          }
+
+          @Override
+          public Instant timestamp() {
+            return input.getTimestamp();
+          }
+
+          @Override
+          public BoundedWindow window() {
+            return Iterables.getOnlyElement(input.getWindows());
+          }
+        };
+    Collection<W> windows = windowFn.assignWindows(ctxt);
+    return WindowedValue.of(input.getValue(), input.getTimestamp(), windows, 
input.getPane());
+  }
+}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java
deleted file mode 100644
index e619ce1f3c9..00000000000
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness;
-
-import static com.google.common.collect.Iterables.getOnlyElement;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
-import org.apache.beam.fn.harness.state.BeamFnStateClient;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.function.ThrowingFunction;
-import org.apache.beam.sdk.fn.function.ThrowingRunnable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A {@code PTransformRunner} which executes simple map functions.
- *
- * <p>Simple map functions are used in a large number of transforms, 
especially runner-managed
- * transforms, such as map_windows.
- *
- * <p>TODO: Add support for DoFns which are actually user supplied map/lambda 
functions instead
- * of using the {@link FnApiDoFnRunner} instance.
- */
-public class MapFnRunner<InputT, OutputT> {
-
-  public static <InputT, OutputT> PTransformRunnerFactory<?>
-      createMapFnRunnerFactoryWith(
-          CreateMapFunctionForPTransform<InputT, OutputT> fnFactory) {
-    return new Factory<>(fnFactory);
-  }
-
-  /** A function factory which given a PTransform returns a map function. */
-  public interface CreateMapFunctionForPTransform<InputT, OutputT> {
-    ThrowingFunction<InputT, OutputT> createMapFunctionForPTransform(
-        String ptransformId,
-        PTransform pTransform) throws IOException;
-  }
-
-  /** A factory for {@link MapFnRunner}s. */
-  static class Factory<InputT, OutputT>
-      implements PTransformRunnerFactory<MapFnRunner<InputT, OutputT>> {
-
-    private final CreateMapFunctionForPTransform<InputT, OutputT> fnFactory;
-
-    Factory(CreateMapFunctionForPTransform<InputT, OutputT> fnFactory) {
-      this.fnFactory = fnFactory;
-    }
-
-    @Override
-    public MapFnRunner<InputT, OutputT> createRunnerForPTransform(
-        PipelineOptions pipelineOptions,
-        BeamFnDataClient beamFnDataClient,
-        BeamFnStateClient beamFnStateClient,
-        String pTransformId,
-        PTransform pTransform,
-        Supplier<String> processBundleInstructionId,
-        Map<String, PCollection> pCollections,
-        Map<String, RunnerApi.Coder> coders,
-        Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
-        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
-        Consumer<ThrowingRunnable> addStartFunction,
-        Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
-
-      Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers =
-          (Collection) pCollectionIdsToConsumers.get(
-              getOnlyElement(pTransform.getOutputsMap().values()));
-
-      MapFnRunner<InputT, OutputT> runner = new MapFnRunner<>(
-          fnFactory.createMapFunctionForPTransform(pTransformId, pTransform),
-          MultiplexingFnDataReceiver.forConsumers(consumers));
-
-      pCollectionIdsToConsumers.put(
-          Iterables.getOnlyElement(pTransform.getInputsMap().values()),
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) 
runner::map);
-      return runner;
-    }
-  }
-
-  private final ThrowingFunction<InputT, OutputT> mapFunction;
-  private final FnDataReceiver<WindowedValue<OutputT>> consumer;
-
-  MapFnRunner(
-      ThrowingFunction<InputT, OutputT> mapFunction,
-      FnDataReceiver<WindowedValue<OutputT>> consumer) {
-    this.mapFunction = mapFunction;
-    this.consumer = consumer;
-  }
-
-  public void map(WindowedValue<InputT> element) throws Exception {
-    WindowedValue<OutputT> output = 
element.withValue(mapFunction.apply(element.getValue()));
-    consumer.accept(output);
-  }
-}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
new file mode 100644
index 00000000000..7819726f7e4
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Utilities to create {@code PTransformRunners} which execute simple map 
functions.
+ *
+ * <p>Simple map functions are used in a large number of transforms, 
especially runner-managed
+ * transforms, such as map_windows.
+ *
+ * <p>TODO: Add support for DoFns which are actually user supplied map/lambda 
functions instead of
+ * using the {@link FnApiDoFnRunner} instance.
+ */
+public abstract class MapFnRunners {
+
+  /** Create a {@link MapFnRunners} where the map function consumes elements 
directly. */
+  public static <InputT, OutputT> PTransformRunnerFactory<?> 
forValueMapFnFactory(
+      ValueMapFnFactory<InputT, OutputT> fnFactory) {
+    return new Factory<>(new CompressedValueOnlyMapperFactory<>(fnFactory));
+  }
+
+  /**
+   * Create a {@link MapFnRunners} where the map function consumes {@link 
WindowedValue Windowed
+   * Values} and produced {@link WindowedValue Windowed Values}.
+   *
+   * <p>Each {@link WindowedValue} provided to the function produced by the 
{@link
+   * WindowedValueMapFnFactory} will be in exactly one {@link
+   * org.apache.beam.sdk.transforms.windowing.BoundedWindow window}.
+   */
+  public static <InputT, OutputT> PTransformRunnerFactory<?> 
forWindowedValueMapFnFactory(
+      WindowedValueMapFnFactory<InputT, OutputT> fnFactory) {
+    return new Factory<>(new ExplodedWindowedValueMapperFactory<>(fnFactory));
+  }
+
+  /** A function factory which given a PTransform returns a map function. */
+  public interface ValueMapFnFactory<InputT, OutputT> {
+    ThrowingFunction<InputT, OutputT> forPTransform(String ptransformId, 
PTransform pTransform)
+        throws IOException;
+  }
+
+  /**
+   * A function factory which given a PTransform returns a map function over 
the entire {@link
+   * WindowedValue} of input and output elements.
+   *
+   * <p>{@link WindowedValue Windowed Values} will only ever be in a single 
window.
+   */
+  public interface WindowedValueMapFnFactory<InputT, OutputT> {
+    ThrowingFunction<WindowedValue<InputT>, WindowedValue<OutputT>> 
forPTransform(
+        String ptransformId, PTransform ptransform) throws IOException;
+  }
+
+  /** A factory for {@link MapFnRunners}s. */
+  private static class Factory<InputT, OutputT>
+      implements PTransformRunnerFactory<Mapper<InputT, OutputT>> {
+
+    private final MapperFactory mapperFactory;
+
+    private Factory(MapperFactory<InputT, OutputT> mapperFactory) {
+      this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public Mapper<InputT, OutputT> createRunnerForPTransform(
+        PipelineOptions pipelineOptions,
+        BeamFnDataClient beamFnDataClient,
+        BeamFnStateClient beamFnStateClient,
+        String pTransformId,
+        PTransform pTransform,
+        Supplier<String> processBundleInstructionId,
+        Map<String, PCollection> pCollections,
+        Map<String, RunnerApi.Coder> coders,
+        Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
+        Multimap<String, FnDataReceiver<WindowedValue<?>>> 
pCollectionIdsToConsumers,
+        Consumer<ThrowingRunnable> addStartFunction,
+        Consumer<ThrowingRunnable> addFinishFunction)
+        throws IOException {
+
+      Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers =
+          (Collection)
+              
pCollectionIdsToConsumers.get(getOnlyElement(pTransform.getOutputsMap().values()));
+
+      Mapper<InputT, OutputT> mapper =
+          mapperFactory.create(
+              pTransformId, pTransform, 
MultiplexingFnDataReceiver.forConsumers(consumers));
+
+      pCollectionIdsToConsumers.put(
+          Iterables.getOnlyElement(pTransform.getInputsMap().values()),
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) 
mapper::map);
+      return mapper;
+    }
+  }
+
+  @FunctionalInterface
+  private interface MapperFactory<InputT, OutputT> {
+    Mapper<InputT, OutputT> create(
+        String ptransformId, PTransform ptransform, 
FnDataReceiver<WindowedValue<OutputT>> outputs)
+        throws IOException;
+  }
+
+  private interface Mapper<InputT, OutputT> {
+    void map(WindowedValue<InputT> input) throws Exception;
+  }
+
+  private static class ExplodedWindowedValueMapperFactory<InputT, OutputT>
+      implements MapperFactory<InputT, OutputT> {
+    private final WindowedValueMapFnFactory<InputT, OutputT> fnFactory;
+
+    private ExplodedWindowedValueMapperFactory(
+        WindowedValueMapFnFactory<InputT, OutputT> fnFactory) {
+      this.fnFactory = fnFactory;
+    }
+
+    @Override
+    public Mapper<InputT, OutputT> create(
+        String ptransformId, PTransform ptransform, 
FnDataReceiver<WindowedValue<OutputT>> outputs)
+        throws IOException {
+      ThrowingFunction<WindowedValue<InputT>, WindowedValue<OutputT>> fn =
+          fnFactory.forPTransform(ptransformId, ptransform);
+      return input -> {
+        for (WindowedValue<InputT> exploded : input.explodeWindows()) {
+          outputs.accept(fn.apply(exploded));
+        }
+      };
+    }
+  }
+
+  private static class CompressedValueOnlyMapperFactory<InputT, OutputT>
+      implements MapperFactory<InputT, OutputT> {
+    private final ValueMapFnFactory<InputT, OutputT> fnFactory;
+
+    private CompressedValueOnlyMapperFactory(ValueMapFnFactory<InputT, 
OutputT> fnFactory) {
+      this.fnFactory = fnFactory;
+    }
+
+    @Override
+    public Mapper<InputT, OutputT> create(
+        String ptransformId, PTransform ptransform, 
FnDataReceiver<WindowedValue<OutputT>> outputs)
+        throws IOException {
+      ThrowingFunction<InputT, OutputT> fn = 
fnFactory.forPTransform(ptransformId, ptransform);
+      return input -> 
outputs.accept(input.withValue(fn.apply(input.getValue())));
+    }
+  }
+}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
index ae2a6c73c70..c43680b0d96 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
@@ -32,9 +32,9 @@
 import org.apache.beam.sdk.values.KV;
 
 /**
- * Maps windows using a window mapping fn. The input is {@link KV} with the 
key being a nonce
- * and the value being a window, the output must be a {@link KV} with the key 
being the same nonce
- * as the input and the value being the mapped window.
+ * Maps windows using a window mapping fn. The input is {@link KV} with the 
key being a nonce and
+ * the value being a window, the output must be a {@link KV} with the key 
being the same nonce as
+ * the input and the value being the mapped window.
  */
 public class WindowMappingFnRunner {
   static final String URN = 
BeamUrns.getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
@@ -48,19 +48,20 @@
 
     @Override
     public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() 
{
-      return ImmutableMap.of(URN, MapFnRunner.createMapFnRunnerFactoryWith(
-          WindowMappingFnRunner::createMapFunctionForPTransform));
+      return ImmutableMap.of(
+          URN,
+          
MapFnRunners.forValueMapFnFactory(WindowMappingFnRunner::createMapFunctionForPTransform));
     }
   }
 
   static <T, W1 extends BoundedWindow, W2 extends BoundedWindow>
-  ThrowingFunction<KV<T, W1>, KV<T, W2>> createMapFunctionForPTransform(
-      String ptransformId, PTransform pTransform) throws IOException {
+      ThrowingFunction<KV<T, W1>, KV<T, W2>> createMapFunctionForPTransform(
+          String ptransformId, PTransform pTransform) throws IOException {
     SdkFunctionSpec windowMappingFnPayload =
         SdkFunctionSpec.parseFrom(pTransform.getSpec().getPayload());
     WindowMappingFn<W2> windowMappingFn =
-        (WindowMappingFn<W2>) 
PCollectionViewTranslation.windowMappingFnFromProto(
-            windowMappingFnPayload);
+        (WindowMappingFn<W2>)
+            
PCollectionViewTranslation.windowMappingFnFromProto(windowMappingFnPayload);
     return (KV<T, W1> input) ->
         KV.of(input.getKey(), 
windowMappingFn.getSideInputWindow(input.getValue()));
   }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
new file mode 100644
index 00000000000..bfe9fbcd021
--- /dev/null
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import 
org.apache.beam.fn.harness.AssignWindowsRunner.AssignWindowsMapFnFactory;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link org.apache.beam.fn.harness.AssignWindowsRunner}. */
+@RunWith(JUnit4.class)
+public class AssignWindowsRunnerTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  private transient AssignWindowsRunner.AssignWindowsMapFnFactory<?> factory =
+      new AssignWindowsRunner.AssignWindowsMapFnFactory<>();
+
+  @Test
+  public void singleInputSingleOutputSucceeds() throws Exception {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(10L));
+
+    AssignWindowsRunner<Integer, IntervalWindow> runner = 
AssignWindowsRunner.create(windowFn);
+
+    assertThat(
+        runner.assignWindows(WindowedValue.valueInGlobalWindow(1)),
+        equalTo(
+            WindowedValue.of(
+                1,
+                BoundedWindow.TIMESTAMP_MIN_VALUE,
+                windowFn.assignWindow(BoundedWindow.TIMESTAMP_MIN_VALUE),
+                PaneInfo.NO_FIRING)));
+    assertThat(
+        runner.assignWindows(
+            WindowedValue.of(
+                2,
+                new Instant(-10L),
+                new IntervalWindow(new Instant(-120000L), 
Duration.standardMinutes(3L)),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)),
+        equalTo(
+            WindowedValue.of(
+                2,
+                new Instant(-10L),
+                windowFn.assignWindow(new Instant(-10L)),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void singleInputMultipleOutputSucceeds() throws Exception {
+    WindowFn<Object, IntervalWindow> windowFn =
+        
SlidingWindows.of(Duration.standardMinutes(4L)).every(Duration.standardMinutes(2L));
+
+    AssignWindowsRunner<Integer, IntervalWindow> runner = 
AssignWindowsRunner.create(windowFn);
+
+    IntervalWindow firstWindow =
+        new IntervalWindow(
+            new Instant(0).minus(Duration.standardMinutes(4L)), 
Duration.standardMinutes(4L));
+    IntervalWindow secondWindow =
+        new IntervalWindow(
+            new Instant(0).minus(Duration.standardMinutes(2L)), 
Duration.standardMinutes(4L));
+    IntervalWindow thirdWindow =
+        new IntervalWindow(
+            new Instant(0).minus(Duration.standardMinutes(0L)), 
Duration.standardMinutes(4L));
+
+    WindowedValue<Integer> firstValue =
+        WindowedValue.timestampedValueInGlobalWindow(-3, new Instant(-12));
+    assertThat(
+        runner.assignWindows(firstValue),
+        equalTo(
+            WindowedValue.of(
+                -3,
+                new Instant(-12),
+                ImmutableSet.of(firstWindow, secondWindow),
+                firstValue.getPane())));
+    WindowedValue<Integer> secondValue =
+        WindowedValue.of(
+            3,
+            new Instant(12),
+            new IntervalWindow(new Instant(-12), Duration.standardMinutes(24)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    assertThat(
+        runner.assignWindows(secondValue),
+        equalTo(
+            WindowedValue.of(
+                3,
+                new Instant(12),
+                ImmutableSet.of(secondWindow, thirdWindow),
+                secondValue.getPane())));
+  }
+
+  @Test
+  public void multipleInputWindowsAsMapFnSucceeds() throws Exception {
+    WindowFn<Object, BoundedWindow> windowFn =
+        new WindowFn<Object, BoundedWindow>() {
+          @Override
+          public Collection<BoundedWindow> assignWindows(AssignContext c) {
+            c.window();
+            return ImmutableSet.of(
+                GlobalWindow.INSTANCE,
+                new IntervalWindow(new Instant(-500), 
Duration.standardMinutes(3)));
+          }
+
+          @Override
+          public void mergeWindows(MergeContext c) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isCompatible(WindowFn<?, ?> other) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Coder<BoundedWindow> windowCoder() {
+            throw new UnsupportedOperationException();
+          }
+        };
+    Collection<WindowedValue<?>> outputs = new ArrayList<>();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> receivers = 
ArrayListMultimap.create();
+    receivers.put("output", outputs::add);
+    MapFnRunners.forWindowedValueMapFnFactory(new 
AssignWindowsMapFnFactory<>())
+        .createRunnerForPTransform(
+            null /* pipelineOptions */,
+            null /* beamFnDataClient */,
+            null /* beamFnStateClient */,
+            null /* pTransformId */,
+            PTransform.newBuilder()
+                .putInputs("in", "input")
+                .putOutputs("out", "output")
+                .setSpec(
+                    FunctionSpec.newBuilder()
+                        
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
+                        .setPayload(
+                            WindowIntoPayload.newBuilder()
+                                .setWindowFn(
+                                    WindowingStrategyTranslation.toProto(
+                                        windowFn, SdkComponents.create()))
+                                .build()
+                                .toByteString()))
+                .build(),
+            null /* processBundleInstructionId */,
+            null /* pCollections */,
+            null /* coders */,
+            null /* windowingStrategies */,
+            receivers,
+            null /* addStartFunction */,
+            null /* addFinishFunction */);
+
+    WindowedValue<Integer> value =
+        WindowedValue.of(
+            2,
+            new Instant(-10L),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-22L), 
Duration.standardMinutes(5L)),
+                new IntervalWindow(new Instant(-120000L), 
Duration.standardMinutes(3L))),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterables.getOnlyElement(receivers.get("input")).accept(value);
+    assertThat(
+        outputs,
+        containsInAnyOrder(
+            WindowedValue.of(
+                2,
+                new Instant(-10L),
+                ImmutableSet.of(
+                    GlobalWindow.INSTANCE,
+                    new IntervalWindow(new Instant(-500), 
Duration.standardMinutes(3))),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            WindowedValue.of(
+                2,
+                new Instant(-10L),
+                ImmutableSet.of(
+                    GlobalWindow.INSTANCE,
+                    new IntervalWindow(new Instant(-500), 
Duration.standardMinutes(3))),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void multipleInputWindowsThrows() throws Exception {
+    WindowFn<Object, BoundedWindow> windowFn =
+        new WindowFn<Object, BoundedWindow>() {
+          @Override
+          public Collection<BoundedWindow> assignWindows(AssignContext c) 
throws Exception {
+            return Collections.singleton(c.window());
+          }
+
+          @Override
+          public void mergeWindows(MergeContext c) throws Exception {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isCompatible(WindowFn<?, ?> other) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public Coder<BoundedWindow> windowCoder() {
+            throw new UnsupportedOperationException();
+          }
+        };
+    AssignWindowsRunner<Integer, BoundedWindow> runner = 
AssignWindowsRunner.create(windowFn);
+
+    thrown.expect(IllegalArgumentException.class);
+    runner.assignWindows(
+        WindowedValue.of(
+            2,
+            new Instant(-10L),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-22L), 
Duration.standardMinutes(5L)),
+                new IntervalWindow(new Instant(-120000L), 
Duration.standardMinutes(3L))),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING));
+  }
+
+  @Test
+  public void factoryCreatesFromJavaWindowFn() throws Exception {
+    PTransform windowPTransform =
+        PTransform.newBuilder()
+            .putInputs("in", "input")
+            .putOutputs("out", "output")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
+                    .setPayload(
+                        WindowIntoPayload.newBuilder()
+                            .setWindowFn(
+                                WindowingStrategyTranslation.toProto(
+                                    new TestWindowFn(), 
SdkComponents.create()))
+                            .build()
+                            .toByteString())
+                    .build())
+            .build();
+
+    ThrowingFunction<WindowedValue<?>, WindowedValue<?>> fn =
+        (ThrowingFunction) factory.forPTransform("transform", 
windowPTransform);
+
+    assertThat(
+        fn.apply(
+            WindowedValue.of(
+                22L,
+                new Instant(5),
+                new IntervalWindow(new Instant(0L), new Instant(20027L)),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)),
+        equalTo(
+            WindowedValue.of(
+                22L,
+                new Instant(5),
+                new TestWindowFn().assignWindow(new Instant(5)),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void factoryCreatesFromKnownWindowFn() throws Exception {
+    PTransform windowPTransform =
+        PTransform.newBuilder()
+            .putInputs("in", "input")
+            .putOutputs("out", "output")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
+                    .setPayload(
+                        WindowIntoPayload.newBuilder()
+                            .setWindowFn(
+                                WindowingStrategyTranslation.toProto(
+                                    
Sessions.withGapDuration(Duration.standardMinutes(12L)),
+                                    SdkComponents.create()))
+                            .build()
+                            .toByteString())
+                    .build())
+            .build();
+    ThrowingFunction<WindowedValue<?>, WindowedValue<?>> fn =
+        (ThrowingFunction) factory.forPTransform("transform", 
windowPTransform);
+    WindowedValue<?> output =
+        fn.apply(
+            WindowedValue.of(
+                22L,
+                new Instant(5),
+                new IntervalWindow(new Instant(0L), new Instant(20027L)),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    assertThat(
+        output,
+        equalTo(
+            WindowedValue.of(
+                22L,
+                new Instant(5),
+                new IntervalWindow(new Instant(5L), 
Duration.standardMinutes(12L)),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  private static class TestWindowFn extends PartitioningWindowFn<Object, 
IntervalWindow> {
+    @Override
+    public IntervalWindow assignWindow(Instant timestamp) {
+      return new IntervalWindow(
+          BoundedWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.INSTANCE.maxTimestamp());
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return equals(other);
+    }
+
+    @Override
+    public Coder<IntervalWindow> windowCoder() {
+      return IntervalWindowCoder.of();
+    }
+  }
+}
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
deleted file mode 100644
index 57ef321cb19..00000000000
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.function.ThrowingFunction;
-import org.apache.beam.sdk.fn.function.ThrowingRunnable;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link MapFnRunner}. */
-@RunWith(JUnit4.class)
-public class MapFnRunnerTest {
-  private static final String EXPECTED_ID = "pTransformId";
-  private static final RunnerApi.PTransform EXPECTED_PTRANSFORM = 
RunnerApi.PTransform.newBuilder()
-      .putInputs("input", "inputPC")
-      .putOutputs("output", "outputPC")
-      .build();
-
-  @Test
-  public void testWindowMapping() throws Exception {
-
-    List<WindowedValue<?>> outputConsumer = new ArrayList<>();
-    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
-    consumers.put("outputPC", outputConsumer::add);
-
-    List<ThrowingRunnable> startFunctions = new ArrayList<>();
-    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
-    new MapFnRunner.Factory<>(this::createMapFunctionForPTransform)
-        .createRunnerForPTransform(
-            PipelineOptionsFactory.create(),
-            null /* beamFnDataClient */,
-            null /* beamFnStateClient */,
-            EXPECTED_ID,
-            EXPECTED_PTRANSFORM,
-            Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            consumers,
-            startFunctions::add,
-            finishFunctions::add);
-
-    assertThat(startFunctions, empty());
-    assertThat(finishFunctions, empty());
-
-    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
-
-    Iterables.getOnlyElement(
-        consumers.get("inputPC")).accept(valueInGlobalWindow("abc"));
-
-    assertThat(outputConsumer, contains(valueInGlobalWindow("ABC")));
-  }
-
-  public ThrowingFunction<String, String> 
createMapFunctionForPTransform(String ptransformId,
-      PTransform pTransform) throws IOException {
-    assertEquals(EXPECTED_ID, ptransformId);
-    assertEquals(EXPECTED_PTRANSFORM, pTransform);
-    return (String str) -> str.toUpperCase();
-  }
-}
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
new file mode 100644
index 00000000000..906bdebb674
--- /dev/null
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.fn.harness.MapFnRunners.ValueMapFnFactory;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MapFnRunners}. */
+@RunWith(JUnit4.class)
+public class MapFnRunnersTest {
+  private static final String EXPECTED_ID = "pTransformId";
+  private static final RunnerApi.PTransform EXPECTED_PTRANSFORM =
+      RunnerApi.PTransform.newBuilder()
+          .putInputs("input", "inputPC")
+          .putOutputs("output", "outputPC")
+          .build();
+
+  @Test
+  public void testValueOnlyMapping() throws Exception {
+    List<WindowedValue<?>> outputConsumer = new ArrayList<>();
+    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    consumers.put("outputPC", outputConsumer::add);
+
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    ValueMapFnFactory<String, String> factory = (ptId, pt) -> 
String::toUpperCase;
+    MapFnRunners.forValueMapFnFactory(factory)
+        .createRunnerForPTransform(
+            PipelineOptionsFactory.create(),
+            null /* beamFnDataClient */,
+            null /* beamFnStateClient */,
+            EXPECTED_ID,
+            EXPECTED_PTRANSFORM,
+            Suppliers.ofInstance("57L")::get,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            consumers,
+            startFunctions::add,
+            finishFunctions::add);
+
+    assertThat(startFunctions, empty());
+    assertThat(finishFunctions, empty());
+
+    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
+
+    
Iterables.getOnlyElement(consumers.get("inputPC")).accept(valueInGlobalWindow("abc"));
+
+    assertThat(outputConsumer, contains(valueInGlobalWindow("ABC")));
+  }
+
+  @Test
+  public void testFullWindowedValueMapping() throws Exception {
+    List<WindowedValue<?>> outputConsumer = new ArrayList<>();
+    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    consumers.put("outputPC", outputConsumer::add);
+
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    
MapFnRunners.forWindowedValueMapFnFactory(this::createMapFunctionForPTransform)
+        .createRunnerForPTransform(
+            PipelineOptionsFactory.create(),
+            null /* beamFnDataClient */,
+            null /* beamFnStateClient */,
+            EXPECTED_ID,
+            EXPECTED_PTRANSFORM,
+            Suppliers.ofInstance("57L")::get,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            consumers,
+            startFunctions::add,
+            finishFunctions::add);
+
+    assertThat(startFunctions, empty());
+    assertThat(finishFunctions, empty());
+
+    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
+
+    
Iterables.getOnlyElement(consumers.get("inputPC")).accept(valueInGlobalWindow("abc"));
+
+    assertThat(outputConsumer, contains(valueInGlobalWindow("ABC")));
+  }
+
+  @Test
+  public void testFullWindowedValueMappingWithCompressedWindow() throws 
Exception {
+    List<WindowedValue<?>> outputConsumer = new ArrayList<>();
+    Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
HashMultimap.create();
+    consumers.put("outputPC", outputConsumer::add);
+
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    
MapFnRunners.forWindowedValueMapFnFactory(this::createMapFunctionForPTransform)
+        .createRunnerForPTransform(
+            PipelineOptionsFactory.create(),
+            null /* beamFnDataClient */,
+            null /* beamFnStateClient */,
+            EXPECTED_ID,
+            EXPECTED_PTRANSFORM,
+            Suppliers.ofInstance("57L")::get,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            consumers,
+            startFunctions::add,
+            finishFunctions::add);
+
+    assertThat(startFunctions, empty());
+    assertThat(finishFunctions, empty());
+
+    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), 
Duration.standardMinutes(10L));
+    IntervalWindow secondWindow =
+        new IntervalWindow(new Instant(-10L), Duration.standardSeconds(22L));
+    Iterables.getOnlyElement(consumers.get("inputPC"))
+        .accept(
+            WindowedValue.of(
+                "abc",
+                new Instant(12),
+                ImmutableSet.of(firstWindow, GlobalWindow.INSTANCE, 
secondWindow),
+                PaneInfo.NO_FIRING));
+
+    assertThat(
+        outputConsumer,
+        containsInAnyOrder(
+            WindowedValue.timestampedValueInGlobalWindow("ABC", new 
Instant(12)),
+            WindowedValue.of("ABC", new Instant(12), secondWindow, 
PaneInfo.NO_FIRING),
+            WindowedValue.of("ABC", new Instant(12), firstWindow, 
PaneInfo.NO_FIRING)));
+  }
+
+  public ThrowingFunction<WindowedValue<String>, WindowedValue<String>>
+      createMapFunctionForPTransform(String ptransformId, PTransform 
pTransform) {
+    assertEquals(EXPECTED_ID, ptransformId);
+    assertEquals(EXPECTED_PTRANSFORM, pTransform);
+    return (WindowedValue<String> str) -> 
str.withValue(str.getValue().toUpperCase());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101907)
    Time Spent: 3h 20m  (was: 3h 10m)

> Implement Assign Windows in the Java SDK Harness
> ------------------------------------------------
>
>                 Key: BEAM-4269
>                 URL: https://issues.apache.org/jira/browse/BEAM-4269
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-harness
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> This allows execution of Java WindowFns over the Fn API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to