lukecwik commented on a change in pull request #16439:
URL: https://github.com/apache/beam/pull/16439#discussion_r785219792



##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator implements AutoCloseable {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.buffer = new ConcurrentHashMap<>();
+    this.outboundObserver = outboundObserver;
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+  }
+
+  /**
+   * Register the outbound logical endpoint alongside it's value coder. All 
registered endpoints
+   * will eventually be removed when {@link #close()} is called at the end of 
a bundle.
+   */
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public void flush() throws IOException {
+    if (byteCounter > 0) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  @Override
+  public void close() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    if (byteCounter == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());
+    outputLocations.clear();
+  }
+
+  public <T> void accept(LogicalEndpoint endpoint, T data) throws Exception {

Review comment:
       In a follow-up PR it will make sense to make give a `ThrowingConsumer` 
to the BeamFnDataWriteRunner so we can avoid the map lookup on each element 
call. If its trivial, feel free to do now.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator implements AutoCloseable {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.buffer = new ConcurrentHashMap<>();
+    this.outboundObserver = outboundObserver;
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+  }
+
+  /**
+   * Register the outbound logical endpoint alongside it's value coder. All 
registered endpoints
+   * will eventually be removed when {@link #close()} is called at the end of 
a bundle.
+   */
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public void flush() throws IOException {
+    if (byteCounter > 0) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  @Override
+  public void close() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    if (byteCounter == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());
+    outputLocations.clear();
+  }
+
+  public <T> void accept(LogicalEndpoint endpoint, T data) throws Exception {
+    if (timeLimit > 0) {
+      checkFlushThreadException();
+    }
+    Output output = buffer.computeIfAbsent(endpoint, e -> 
ByteString.newOutput());
+    int size = output.size();
+    ((Coder<T>) outputLocations.get(endpoint)).encode(data, output);
+    byteCounter += output.size() - size;

Review comment:
       All elements must encode to be at least one byte in length to ensure 
that we can't enter an infinite loop when encoding/decoding since we can 
encode/decode an infinite number of 0 byte length objects.
   
   See 
https://github.com/apache/beam/blob/e38c6ac5760f55494e71f61d0271a624ca96ea17/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java#L239
   
   Was this logic missing in the existing implementation?

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator implements AutoCloseable {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.buffer = new ConcurrentHashMap<>();

Review comment:
       ConcurrentHashMap does not provide thread safety over mutation of the 
ByteString.Output within.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator implements AutoCloseable {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.buffer = new ConcurrentHashMap<>();
+    this.outboundObserver = outboundObserver;
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+  }
+
+  /**
+   * Register the outbound logical endpoint alongside it's value coder. All 
registered endpoints
+   * will eventually be removed when {@link #close()} is called at the end of 
a bundle.
+   */
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public void flush() throws IOException {
+    if (byteCounter > 0) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  @Override
+  public void close() {

Review comment:
       I'm not sure you gain much by making this AutoCloseable since your next 
PR will be `steal output and do nothing if no data has yet to be sent` so it 
may make more sense to choose a clearer name here then `close()`.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Set<LogicalEndpoint> outputLocations;
+  private final Set<LogicalEndpoint> unregisteredOutputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashSet<>();
+    this.unregisteredOutputLocations = new HashSet<>();
+    this.buffer = new ConcurrentHashMap<>();
+    this.outboundObserver = outboundObserver;
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+  }
+
+  public synchronized void registerOutputLocation(LogicalEndpoint endpoint) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.add(endpoint);
+  }
+
+  public synchronized void unregisterOutputLocation(LogicalEndpoint endpoint) {
+    LOG.debug("UnRegistering endpoint: {}", endpoint);

Review comment:
       ```suggestion
       LOG.debug("Unregistering endpoint: {}", endpoint);
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
##########
@@ -91,6 +88,18 @@
     /** Returns a {@link FnDataReceiver} to send output to for the specified 
PCollection id. */
     <T> FnDataReceiver<T> getPCollectionConsumer(String pCollectionId);
 
+    /**
+     * Returns all {@link Endpoints.ApiServiceDescriptor} to {@link 
BeamFnDataOutboundAggregator}
+     * mappings.
+     */
+    Map<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator> 
getOutboundAggregators();

Review comment:
       Instead of exposing the aggregator, consider exposing a function like 
   ```
   FnDataReceiver<T> receiver 
addOutgoingDataEndpoint(Endpoints.ApiServiceDescriptor apiServiceDescriptor, 
Coder<T> coder)
   ```
   and ditto for timers with:
   ```
   FnDataReceiver<Timer<T>> receiver addOutgoingTimerEndpoint(String 
timerFamilyId, Coder<Timer<T>> coder)
   ```
   
   this will make testing the `BeamFnDataWriteRunner` and `FnApiDoFnRunner` 
simpler.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator implements AutoCloseable {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.buffer = new ConcurrentHashMap<>();
+    this.outboundObserver = outboundObserver;
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+  }
+
+  /**
+   * Register the outbound logical endpoint alongside it's value coder. All 
registered endpoints
+   * will eventually be removed when {@link #close()} is called at the end of 
a bundle.
+   */
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public void flush() throws IOException {
+    if (byteCounter > 0) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  @Override
+  public void close() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    if (byteCounter == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());
+    outputLocations.clear();
+  }
+
+  public <T> void accept(LogicalEndpoint endpoint, T data) throws Exception {
+    if (timeLimit > 0) {
+      checkFlushThreadException();
+    }
+    Output output = buffer.computeIfAbsent(endpoint, e -> 
ByteString.newOutput());
+    int size = output.size();
+    ((Coder<T>) outputLocations.get(endpoint)).encode(data, output);
+    byteCounter += output.size() - size;
+
+    if (byteCounter >= sizeLimit) {
+      flush();
+    }
+  }
+
+  private Elements.Builder convertBufferForTransmission() {
+    Elements.Builder bufferedElements = Elements.newBuilder();
+    for (Map.Entry<LogicalEndpoint, ByteString.Output> bufferEntry : 
buffer.entrySet()) {
+      LogicalEndpoint endpoint = bufferEntry.getKey();
+      if (endpoint.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(endpoint.getInstructionId())
+            .setTransformId(endpoint.getTransformId())
+            .setTimerFamilyId(endpoint.getTimerFamilyId())
+            .setTimers(bufferEntry.getValue().toByteString());
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(endpoint.getInstructionId())
+            .setTransformId(endpoint.getTransformId())
+            .setData(bufferEntry.getValue().toByteString());
+      }
+    }
+    byteCounter = 0;
+    buffer.clear();
+    return bufferedElements;
+  }
+
+  private void periodicFlush() {
+    try {
+      flush();

Review comment:
       How do you provide thread safety between the thread that is flushing 
periodically and the one that is writing into the map?
   
   I believe there were two instances of the 
BeamFnDataBufferingOutboundObserver, one with synchronization points because of 
the additional thread and one without synchronization for the performance 
benefit.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
##########
@@ -72,87 +69,67 @@
     public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(Context 
context)
         throws IOException {
 
-      BeamFnDataWriteRunner<InputT> runner =
-          new BeamFnDataWriteRunner<>(
-              context.getBundleCacheSupplier(),
-              context.getPTransformId(),
-              context.getPTransform(),
-              context.getProcessBundleInstructionIdSupplier(),
-              context.getCoders(),
-              context.getBeamFnDataClient(),
-              context.getBeamFnStateClient());
-      context.addStartBundleFunction(runner::registerForOutput);
+      RemoteGrpcPort port = 
RemoteGrpcPortWrite.fromPTransform(context.getPTransform()).getPort();
+      RehydratedComponents components =
+          RehydratedComponents.forComponents(
+              
Components.newBuilder().putAllCoders(context.getCoders()).build());
+      Coder<WindowedValue<InputT>> coder =
+          (Coder<WindowedValue<InputT>>)
+              CoderTranslation.fromProto(
+                  context.getCoders().get(port.getCoderId()),
+                  components,
+                  new StateBackedIterableTranslationContext() {
+                    @Override
+                    public Supplier<Cache<?, ?>> getCache() {
+                      return context.getBundleCacheSupplier();
+                    }
+
+                    @Override
+                    public BeamFnStateClient getStateClient() {
+                      return context.getBeamFnStateClient();
+                    }
+
+                    @Override
+                    public Supplier<String> getCurrentInstructionId() {
+                      return context.getProcessBundleInstructionIdSupplier();
+                    }
+                  });
+      BeamFnDataOutboundAggregator outboundAggregator =
+          context
+              .getOutboundAggregators()
+              .computeIfAbsent(
+                  port.getApiServiceDescriptor(),
+                  apiServiceDescriptor ->
+                      
context.getBeamFnDataClient().createOutboundAggregator(apiServiceDescriptor));
+      Supplier<LogicalEndpoint> endpointSupplier =
+          () ->
+              LogicalEndpoint.data(
+                  context.getProcessBundleInstructionIdSupplier().get(), 
context.getPTransformId());
+      BeamFnDataWriteRunner<InputT> runner = new 
BeamFnDataWriteRunner<>(outboundAggregator);
+      context.addStartBundleFunction(() -> 
runner.registerForOutput(endpointSupplier, coder));
+
       context.addPCollectionConsumer(
           getOnlyElement(context.getPTransform().getInputsMap().values()),
           (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) 
runner::consume,
-          ((WindowedValueCoder<InputT>) runner.coder).getValueCoder());
+          ((WindowedValueCoder<InputT>) coder).getValueCoder());
 
-      context.addFinishBundleFunction(runner::close);
       return runner;
     }
   }
 
-  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
-  private final String pTransformId;
-  private final Coder<WindowedValue<InputT>> coder;
-  private final BeamFnDataClient beamFnDataClientFactory;
-  private final Supplier<String> processBundleInstructionIdSupplier;
-
-  private CloseableFnDataReceiver<WindowedValue<InputT>> consumer;
-
-  BeamFnDataWriteRunner(
-      Supplier<Cache<?, ?>> cache,
-      String pTransformId,
-      RunnerApi.PTransform remoteWriteNode,
-      Supplier<String> processBundleInstructionIdSupplier,
-      Map<String, RunnerApi.Coder> coders,
-      BeamFnDataClient beamFnDataClientFactory,
-      BeamFnStateClient beamFnStateClient)
-      throws IOException {
-    this.pTransformId = pTransformId;
-    RemoteGrpcPort port = 
RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
-    this.apiServiceDescriptor = port.getApiServiceDescriptor();
-    this.beamFnDataClientFactory = beamFnDataClientFactory;
-    this.processBundleInstructionIdSupplier = 
processBundleInstructionIdSupplier;
-
-    RehydratedComponents components =
-        
RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
-    this.coder =
-        (Coder<WindowedValue<InputT>>)
-            CoderTranslation.fromProto(
-                coders.get(port.getCoderId()),
-                components,
-                new StateBackedIterableTranslationContext() {
-                  @Override
-                  public Supplier<Cache<?, ?>> getCache() {
-                    return cache;
-                  }
-
-                  @Override
-                  public BeamFnStateClient getStateClient() {
-                    return beamFnStateClient;
-                  }
-
-                  @Override
-                  public Supplier<String> getCurrentInstructionId() {
-                    return processBundleInstructionIdSupplier;
-                  }
-                });
-  }
+  private final BeamFnDataOutboundAggregator outboundAggregator;
+  private LogicalEndpoint endpoint;
 
-  public void registerForOutput() {
-    consumer =
-        beamFnDataClientFactory.send(
-            apiServiceDescriptor,
-            LogicalEndpoint.data(processBundleInstructionIdSupplier.get(), 
pTransformId),
-            coder);
+  BeamFnDataWriteRunner(BeamFnDataOutboundAggregator outboundAggregator) 
throws IOException {
+    this.outboundAggregator = outboundAggregator;
   }
 
-  public void close() throws Exception {
-    consumer.close();
+  public void registerForOutput(Supplier<LogicalEndpoint> outputLocation, 
Coder<?> coder) {
+    endpoint = outputLocation.get();

Review comment:
       If you provide the process bundle descriptor id supplier to the 
`BeamFnDataOutboundAggregator` you can register once during construction of the 
`BeamFnDataWriteRunner` with the `transform id` or `transform id and timer id` 
then the aggregator can own the logic of constructing the `LogicalEndpoint` if 
it still needs to.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator implements AutoCloseable {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private long byteCounter;
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options, StreamObserver<BeamFnApi.Elements> 
outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.buffer = new ConcurrentHashMap<>();
+    this.outboundObserver = outboundObserver;
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+  }
+
+  /**
+   * Register the outbound logical endpoint alongside it's value coder. All 
registered endpoints
+   * will eventually be removed when {@link #close()} is called at the end of 
a bundle.
+   */
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public void flush() throws IOException {
+    if (byteCounter > 0) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  @Override
+  public void close() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    if (byteCounter == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());

Review comment:
       It's useful to have the log statement that says how many elements and 
how many bytes were sent per endpoint for pipeline debugging reasons.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to