[ 
https://issues.apache.org/jira/browse/BEAM-3994?focusedWorklogId=90995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90995
 ]

ASF GitHub Bot logged work on BEAM-3994:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/18 22:28
            Start Date: 13/Apr/18 22:28
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5008: [BEAM-3994] Use typed 
client pool sinks and sources
URL: https://github.com/apache/beam/pull/5008
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
new file mode 100644
index 00000000000..e348dc22cac
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import org.apache.beam.sdk.fn.function.ThrowingConsumer;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+
+/** Control client pool that exposes a source and sink of control clients. */
+public interface ControlClientPool<T extends InstructionRequestHandler> {
+
+  /** Source of control clients. */
+  ThrowingSupplier<T> getSource();
+
+  /** Sink for control clients. */
+  ThrowingConsumer<T> getSink();
+
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
index aa4fe5c058b..a3f59ed4f6c 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -19,13 +19,13 @@
 
 import io.grpc.stub.StreamObserver;
 import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.HeaderAccessor;
+import org.apache.beam.sdk.fn.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,13 +34,14 @@
     implements FnService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FnApiControlClientPoolService.class);
 
-  private final BlockingQueue<FnApiControlClient> clientPool;
+  private final ThrowingConsumer<? super FnApiControlClient> clientPool;
   private final Collection<FnApiControlClient> vendedClients = new 
CopyOnWriteArrayList<>();
   private final HeaderAccessor headerAccessor;
   private AtomicBoolean closed = new AtomicBoolean();
 
   private FnApiControlClientPoolService(
-      BlockingQueue<FnApiControlClient> clientPool, HeaderAccessor 
headerAccessor) {
+      ThrowingConsumer<? super FnApiControlClient> clientPool,
+      HeaderAccessor headerAccessor) {
     this.clientPool = clientPool;
     this.headerAccessor = headerAccessor;
   }
@@ -53,7 +54,8 @@ private FnApiControlClientPoolService(
    * That consumer is responsible for closing the clients when they are no 
longer needed.
    */
   public static FnApiControlClientPoolService offeringClientsToPool(
-      BlockingQueue<FnApiControlClient> clientPool, HeaderAccessor 
headerAccessor) {
+      ThrowingConsumer<? super FnApiControlClient> clientPool,
+      HeaderAccessor headerAccessor) {
     return new FnApiControlClientPoolService(clientPool, headerAccessor);
   }
 
@@ -77,10 +79,12 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
       // discarded, which should be performed by a call to #shutdownNow. The 
remote caller must be
       // able to handle an unexpectedly terminated connection.
       vendedClients.add(newClient);
-      clientPool.put(newClient);
+      clientPool.accept(newClient);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
     return newClient.asResponseObserver();
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/QueueControlClientPool.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/QueueControlClientPool.java
new file mode 100644
index 00000000000..9744a612b70
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/QueueControlClientPool.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.sdk.fn.function.ThrowingConsumer;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+
+/** Control client pool backed by a blocking queue. */
+public class QueueControlClientPool<T extends InstructionRequestHandler>
+    implements ControlClientPool {
+
+  private final BlockingQueue<T> queue;
+
+  /**
+   * Creates a client pool backed by a {@link SynchronousQueue}. Client 
submission blocks until
+   * there is a receiving thread waiting on the source.
+   */
+  public static QueueControlClientPool createSynchronous() {
+      return new QueueControlClientPool<>(new SynchronousQueue<>(true));
+  }
+
+  /**
+   * Creates a client pool backed by an unbounded {@link LinkedBlockingQueue}. 
Clients are buffered
+   * until consumed.
+   */
+  public static QueueControlClientPool createBuffering() {
+      return new QueueControlClientPool<>(new LinkedBlockingQueue<>());
+  }
+
+  private QueueControlClientPool(BlockingQueue<T> queue) {
+      this.queue = queue;
+  }
+
+  @Override
+  public ThrowingSupplier<T> getSource() {
+    return queue::take;
+  }
+
+  @Override
+  public ThrowingConsumer<T> getSink() {
+      return queue::put;
+  }
+
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
index dd7023651c5..3f1d14809c9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
@@ -20,9 +20,7 @@
 
 import io.grpc.ServerServiceDefinition;
 import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.SynchronousQueue;
 import java.util.function.Supplier;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.HeaderAccessor;
@@ -34,7 +32,7 @@
  */
 public class SdkHarnessClientControlService implements FnService {
   private final FnApiControlClientPoolService clientPoolService;
-  private final BlockingQueue<FnApiControlClient> pendingClients;
+  private final ControlClientPool<FnApiControlClient> pendingClients;
 
   private final Supplier<FnDataService> dataService;
 
@@ -49,19 +47,22 @@ private SdkHarnessClientControlService(
       Supplier<FnDataService> dataService, HeaderAccessor headerAccessor) {
     this.dataService = dataService;
     activeClients = new ConcurrentLinkedQueue<>();
-    pendingClients = new SynchronousQueue<>();
+    pendingClients = QueueControlClientPool.createSynchronous();
     clientPoolService =
-        FnApiControlClientPoolService.offeringClientsToPool(pendingClients, 
headerAccessor);
+        
FnApiControlClientPoolService.offeringClientsToPool(pendingClients.getSink(),
+            headerAccessor);
   }
 
   public SdkHarnessClient getClient() {
     try {
       // Block until a client is available.
-      FnApiControlClient getClient = pendingClients.take();
+      FnApiControlClient getClient = pendingClients.getSource().get();
       return SdkHarnessClient.usingFnApiClient(getClient, dataService.get());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException("Interrupted while waiting for client", e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
index 137a0283c89..5b507b9b16a 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
@@ -21,14 +21,14 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.grpc.ManagedChannel;
 import io.grpc.inprocess.InProcessChannelBuilder;
-import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.QueueControlClientPool;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
@@ -67,13 +67,13 @@ public ApiServiceDescriptor dataEndpoint() {
     return dataServer.getApiServiceDescriptor();
   }
 
-  protected void before() throws IOException, InterruptedException {
+  protected void before() throws Exception {
     InProcessServerFactory serverFactory = InProcessServerFactory.create();
     executor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true).build());
-    SynchronousQueue<FnApiControlClient> clientPool = new SynchronousQueue<>();
+    ControlClientPool<FnApiControlClient> clientPool = 
QueueControlClientPool.createSynchronous();
     FnApiControlClientPoolService clientPoolService =
         FnApiControlClientPoolService.offeringClientsToPool(
-            clientPool, GrpcContextHeaderAccessorProvider.getHeaderAccessor());
+            clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
 
     loggingServer =
         GrpcFnServer.allocatePortAndCreateFor(
@@ -98,7 +98,8 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
           return null;
         });
 
-    client = SdkHarnessClient.usingFnApiClient(clientPool.take(), 
dataServer.getService());
+    client = SdkHarnessClient.usingFnApiClient(clientPool.getSource().get(),
+        dataServer.getService());
   }
 
   protected void after() {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
index d08bcfc1edf..d259a96e3d1 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
@@ -26,10 +26,8 @@
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -51,10 +49,11 @@
 
   // For ease of straight-line testing, we use a LinkedBlockingQueue; in 
practice a SynchronousQueue
   // for matching incoming connections and server threads is likely.
-  private final BlockingQueue<FnApiControlClient> pool = new 
LinkedBlockingQueue<>();
+  private final ControlClientPool<FnApiControlClient> pool =
+      QueueControlClientPool.createBuffering();
   private final FnApiControlClientPoolService controlService =
       FnApiControlClientPoolService.offeringClientsToPool(
-          pool, GrpcContextHeaderAccessorProvider.getHeaderAccessor());
+          pool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
   private GrpcFnServer<FnApiControlClientPoolService> server;
   private BeamFnControlGrpc.BeamFnControlStub stub;
 
@@ -77,7 +76,7 @@ public void testIncomingConnection() throws Exception {
     StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
         controlService.control(requestObserver);
 
-    FnApiControlClient client = pool.take();
+    FnApiControlClient client = pool.getSource().get();
 
     // Check that the client is wired up to the request channel
     String id = "fakeInstruction";
@@ -115,7 +114,7 @@ public void onCompleted() {
           }
         });
 
-    pool.take();
+    pool.getSource().get();
     server.close();
 
     latch.await();
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index c9ec7a4f050..80236f1a7be 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -33,10 +33,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
@@ -108,11 +106,11 @@ public void setup() throws Exception {
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
 
-    BlockingQueue<FnApiControlClient> clientPool = new SynchronousQueue<>();
+    ControlClientPool<FnApiControlClient> clientPool = 
QueueControlClientPool.createSynchronous();
     controlServer =
         GrpcFnServer.allocatePortAndCreateFor(
             FnApiControlClientPoolService.offeringClientsToPool(
-                clientPool, 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
+                clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
             serverFactory);
 
     // Create the SDK harness, and wait until it connects
@@ -130,7 +128,7 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
                   }
                 },
                 StreamObserverFactory.direct()));
-    FnApiControlClient controlClient = clientPool.take();
+    FnApiControlClient controlClient = clientPool.getSource().get();
     this.controlClient = SdkHarnessClient.usingFnApiClient(controlClient, 
dataServer.getService());
   }
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/CloseableThrowingConsumer.java
similarity index 95%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/CloseableThrowingConsumer.java
index 59ab149e9ae..43f550bad31 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/CloseableThrowingConsumer.java
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.beam.fn.harness.fn;
+package org.apache.beam.sdk.fn.function;
 
 /** A {@link ThrowingConsumer} that can be closed. */
 public interface CloseableThrowingConsumer<T> extends AutoCloseable, 
ThrowingConsumer<T> {
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiConsumer.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingBiConsumer.java
similarity index 96%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiConsumer.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingBiConsumer.java
index fca8f3ce049..6757824385b 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiConsumer.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingBiConsumer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.fn.harness.fn;
+package org.apache.beam.sdk.fn.function;
 
 import java.util.function.BiConsumer;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingBiFunction.java
similarity index 96%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingBiFunction.java
index 9d505dab215..cb4c60fc911 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingBiFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.fn.harness.fn;
+package org.apache.beam.sdk.fn.function;
 
 import java.util.function.BiFunction;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingConsumer.java
similarity index 96%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingConsumer.java
index b34e8579b49..071bfcc13e6 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingConsumer.java
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.beam.fn.harness.fn;
+package org.apache.beam.sdk.fn.function;
 
 import java.util.function.Consumer;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingFunction.java
similarity index 96%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingFunction.java
index 446ff6072d8..0829c0007cd 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.fn.harness.fn;
+package org.apache.beam.sdk.fn.function;
 
 import java.util.function.Function;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingRunnable.java
similarity index 96%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingRunnable.java
index c7fc29e3556..fbfb9ae12f5 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/ThrowingRunnable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.fn.harness.fn;
+package org.apache.beam.sdk.fn.function;
 
 /**
  * A {@link Runnable} which can throw {@link Exception}s.
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/package-info.java
similarity index 89%
rename from 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
rename to 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/package-info.java
index bbf339684d4..0652dffe816 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/function/package-info.java
@@ -16,7 +16,5 @@
  * limitations under the License.
  */
 
-/**
- * Java 8 functional interface extensions.
- */
-package org.apache.beam.fn.harness.fn;
+/** Java 8 functional interface extensions. */
+package org.apache.beam.sdk.fn.function;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 4324bf4cb48..db255811920 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -30,7 +30,6 @@
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
@@ -46,6 +45,7 @@
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 09bbc6330af..34d704d21bd 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -28,7 +28,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
@@ -44,6 +43,7 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index f09c77d4ee0..5df178f0df2 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -30,7 +30,6 @@
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
@@ -40,6 +39,7 @@
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source.Reader;
 import org.apache.beam.sdk.options.PipelineOptions;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
index 840c9106a27..52d94354e15 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
@@ -29,13 +29,13 @@
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index abfc9a36ed7..b899c16767a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -44,7 +44,6 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BagUserState;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.MultimapSideInput;
@@ -61,6 +60,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 7c2ffe75c88..b3d6107a9cf 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -26,7 +26,6 @@
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.fn.harness.control.RegisterHandler;
 import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
@@ -36,6 +35,7 @@
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java
index 33b37878658..e619ce1f3c9 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunner.java
@@ -29,13 +29,13 @@
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
index a432332d84d..c130d4da699 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -23,13 +23,13 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
index f6e1deffd43..06b3c418de4 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
@@ -23,10 +23,10 @@
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Map;
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.runners.core.construction.PCollectionViewTranslation;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.KV;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index 730a865c86b..c84dda854fe 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -31,11 +31,11 @@
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import 
org.apache.beam.sdk.fn.stream.StreamObserverFactory.StreamObserverClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 0d2e0d0f384..6551570433a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -40,7 +40,6 @@
 import org.apache.beam.fn.harness.PTransformRunnerFactory;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -54,6 +53,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 748a7f2e806..4df6e1ec7e3 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -46,7 +46,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -59,6 +58,7 @@
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.fn.test.TestExecutors;
 import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 68d42894ea1..12540c980b3 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -45,7 +45,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -57,6 +56,7 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index 4189f9b7aa2..044de10b3c1 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -38,9 +38,9 @@
 import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 22bcebd842e..722561fc6fa 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -41,7 +41,6 @@
 import java.util.List;
 import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -52,6 +51,7 @@
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
index 09b9b6b7297..57ef321cb19 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnerTest.java
@@ -32,11 +32,11 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.junit.Test;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMappingFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMappingFnRunnerTest.java
index 1aa6e15ceac..81491ce1e45 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMappingFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMappingFnRunnerTest.java
@@ -19,10 +19,10 @@
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index 620dca6e496..4b4fe1ca90a 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -36,10 +36,10 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import 
org.apache.beam.sdk.fn.stream.StreamObserverFactory.StreamObserverClientFactory;
 import org.apache.beam.sdk.fn.test.TestStreams;
 import org.junit.Test;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index b8cd7066a75..48d56fd7731 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -40,8 +40,6 @@
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.PTransformRunnerFactory;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -53,6 +51,8 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingConsumer;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 90995)
    Time Spent: 2h 20m  (was: 2h 10m)

> Use typed sinks and sources for FnApiControlClientPoolService
> -------------------------------------------------------------
>
>                 Key: BEAM-3994
>                 URL: https://issues.apache.org/jira/browse/BEAM-3994
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Minor
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> We operate with blocking queues directly when managing control clients with 
> the FnApiControlClientPoolService. This makes interactions with the client 
> pool difficult to understand. We should instead make client sources and sinks 
> explicit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to