lukecwik commented on a change in pull request #16439:
URL: https://github.com/apache/beam/pull/16439#discussion_r795890916



##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Supplier<String> processBundleRequestIdSupplier;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final Map<LogicalEndpoint, Long> perEndpointByteCount;
+  private final Map<LogicalEndpoint, Long> perEndpointElementCount;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private final AtomicLong byteCounter = new AtomicLong(0L);

Review comment:
       Consider using LongAdder instead of AtomicLong since LongAdder is 
designed for the use case when there are lots of writes and not many reads.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Supplier<String> processBundleRequestIdSupplier;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final Map<LogicalEndpoint, Long> perEndpointByteCount;
+  private final Map<LogicalEndpoint, Long> perEndpointElementCount;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private final AtomicLong byteCounter = new AtomicLong(0L);
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options,
+      Supplier<String> processBundleRequestIdSupplier,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.perEndpointByteCount = new HashMap<>();
+    this.perEndpointElementCount = new HashMap<>();
+    this.outboundObserver = outboundObserver;
+    this.buffer = new ConcurrentHashMap<>();
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+    this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
+  }
+
+  /**
+   * Register the outbound data logical endpoint for the transform alongside 
it's output value
+   * coder. All registered endpoints will eventually be removed when {@link
+   * #sendBufferedDataAndFinishOutboundStreams()} is called at the end of a 
bundle.
+   */
+  public <T> void registerOutputDataLocation(String pTransformId, Coder<T> 
coder) {
+    registerOutputLocation(
+        LogicalEndpoint.data(processBundleRequestIdSupplier.get(), 
pTransformId), coder);
+  }
+
+  public <T> void registerOutputTimersLocation(
+      String pTransformId, String timerFamilyId, Coder<T> coder) {
+    registerOutputLocation(
+        LogicalEndpoint.timer(processBundleRequestIdSupplier.get(), 
pTransformId, timerFamilyId),
+        coder);
+  }
+
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public synchronized void flush() throws IOException {
+    Elements.Builder elements = convertBufferForTransmission();
+    if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
+      outboundObserver.onNext(elements.build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  public void sendBufferedDataAndFinishOutboundStreams() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    LOG.debug("Sent outbound data size : {}", perEndpointByteCount);
+    LOG.debug("Sent outbound element count : {}", perEndpointElementCount);
+    if (byteCounter.get() == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());
+    outputLocations.clear();
+    perEndpointByteCount.clear();
+    perEndpointElementCount.clear();
+  }
+
+  public <T> void acceptData(String pTransformId, T data) throws Exception {
+    accept(LogicalEndpoint.data(processBundleRequestIdSupplier.get(), 
pTransformId), data);
+  }
+
+  public <T> void acceptTimers(String pTransformId, String timerFamilyId, T 
timers)
+      throws Exception {
+    accept(
+        LogicalEndpoint.timer(processBundleRequestIdSupplier.get(), 
pTransformId, timerFamilyId),
+        timers);
+  }
+
+  public <T> void accept(LogicalEndpoint endpoint, T data) throws Exception {
+    if (timeLimit > 0) {
+      checkFlushThreadException();
+    }
+    buffer.compute(
+        endpoint,
+        (e, output) -> {
+          if (output == null) {
+            output = ByteString.newOutput();
+          }
+          int size = output.size();
+          try {
+            ((Coder<T>) outputLocations.get(e)).encode(data, output);
+          } catch (IOException ex) {
+            throw new RuntimeException("Failed to encode data.");
+          }
+          if (output.size() - size == 0) {
+            output.write(0);
+          }
+          final long delta = (long) output.size() - size;
+          byteCounter.getAndAdd(delta);
+          perEndpointByteCount.compute(
+              e, (e1, byteCount) -> byteCount == null ? delta : delta + 
byteCount);
+          return output;
+        });
+
+    if (byteCounter.get() >= sizeLimit) {
+      flush();
+    }
+  }
+
+  private Elements.Builder convertBufferForTransmission() {
+    Elements.Builder bufferedElements = Elements.newBuilder();
+    for (LogicalEndpoint endpoint : buffer.keySet()) {
+      ByteString.Output output = buffer.remove(endpoint);
+      if (output == null) {
+        continue;
+      }
+      byteCounter.getAndAdd(-1 * output.size());

Review comment:
       sum up all your subtractions locally and only do one atomic update at 
the end of the method.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
##########
@@ -66,93 +61,57 @@
   }
 
   /** A factory for {@link BeamFnDataWriteRunner}s. */
-  static class Factory<InputT> implements 
PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
+  static class Factory<InputT> implements 
PTransformRunnerFactory<BeamFnDataWriteRunner> {
 
     @Override
-    public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(Context 
context)
-        throws IOException {
-
-      BeamFnDataWriteRunner<InputT> runner =
-          new BeamFnDataWriteRunner<>(
-              context.getBundleCacheSupplier(),
-              context.getPTransformId(),
-              context.getPTransform(),
-              context.getProcessBundleInstructionIdSupplier(),
-              context.getCoders(),
-              context.getBeamFnDataClient(),
-              context.getBeamFnStateClient());
-      context.addStartBundleFunction(runner::registerForOutput);
+    public BeamFnDataWriteRunner createRunnerForPTransform(Context context) 
throws IOException {
+
+      RemoteGrpcPort port = 
RemoteGrpcPortWrite.fromPTransform(context.getPTransform()).getPort();
+      RehydratedComponents components =
+          RehydratedComponents.forComponents(
+              
Components.newBuilder().putAllCoders(context.getCoders()).build());
+      Coder<WindowedValue<InputT>> coder =
+          (Coder<WindowedValue<InputT>>)
+              CoderTranslation.fromProto(
+                  context.getCoders().get(port.getCoderId()),
+                  components,
+                  new StateBackedIterableTranslationContext() {
+                    @Override
+                    public Supplier<Cache<?, ?>> getCache() {
+                      return context.getBundleCacheSupplier();
+                    }
+
+                    @Override
+                    public BeamFnStateClient getStateClient() {
+                      return context.getBeamFnStateClient();
+                    }
+
+                    @Override
+                    public Supplier<String> getCurrentInstructionId() {
+                      return context.getProcessBundleInstructionIdSupplier();
+                    }
+                  });
+      BeamFnDataWriteRunner<InputT> runner = new BeamFnDataWriteRunner<>();
+      context.addStartBundleFunction(

Review comment:
       I was thinking that `context#addOutgoingDataEndpoint` would happen now 
and not within start bundle. This would also allow us to pass the 
FnDataReceiver to the `context#addPCollectionConsumer` directly instead of 
registering `runner::consume`.

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Supplier<String> processBundleRequestIdSupplier;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final Map<LogicalEndpoint, Long> perEndpointByteCount;
+  private final Map<LogicalEndpoint, Long> perEndpointElementCount;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private final AtomicLong byteCounter = new AtomicLong(0L);
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options,
+      Supplier<String> processBundleRequestIdSupplier,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.perEndpointByteCount = new HashMap<>();
+    this.perEndpointElementCount = new HashMap<>();
+    this.outboundObserver = outboundObserver;
+    this.buffer = new ConcurrentHashMap<>();
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+    this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
+  }
+
+  /**
+   * Register the outbound data logical endpoint for the transform alongside 
it's output value
+   * coder. All registered endpoints will eventually be removed when {@link
+   * #sendBufferedDataAndFinishOutboundStreams()} is called at the end of a 
bundle.
+   */
+  public <T> void registerOutputDataLocation(String pTransformId, Coder<T> 
coder) {
+    registerOutputLocation(
+        LogicalEndpoint.data(processBundleRequestIdSupplier.get(), 
pTransformId), coder);
+  }
+
+  public <T> void registerOutputTimersLocation(
+      String pTransformId, String timerFamilyId, Coder<T> coder) {
+    registerOutputLocation(
+        LogicalEndpoint.timer(processBundleRequestIdSupplier.get(), 
pTransformId, timerFamilyId),
+        coder);
+  }
+
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public synchronized void flush() throws IOException {
+    Elements.Builder elements = convertBufferForTransmission();
+    if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
+      outboundObserver.onNext(elements.build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  public void sendBufferedDataAndFinishOutboundStreams() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    LOG.debug("Sent outbound data size : {}", perEndpointByteCount);
+    LOG.debug("Sent outbound element count : {}", perEndpointElementCount);
+    if (byteCounter.get() == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());
+    outputLocations.clear();
+    perEndpointByteCount.clear();
+    perEndpointElementCount.clear();
+  }
+
+  public <T> void acceptData(String pTransformId, T data) throws Exception {
+    accept(LogicalEndpoint.data(processBundleRequestIdSupplier.get(), 
pTransformId), data);
+  }
+
+  public <T> void acceptTimers(String pTransformId, String timerFamilyId, T 
timers)
+      throws Exception {
+    accept(
+        LogicalEndpoint.timer(processBundleRequestIdSupplier.get(), 
pTransformId, timerFamilyId),
+        timers);
+  }

Review comment:
       This will create a new `LogicalEndpoint` object on each timer/element 
that is output.
   
   Note that the process bundle request id is only needed when we are 
converting the buffer for transmission, this allows us to return a 
`FnDataReceiver` from `registerOutputLocation` that writes into a map keyed by 
`ptransformId`. This is convenient because we can avoid the map lookup and the 
`LogicalEndpoint` object creation on each element that is output since the 
returned `FnDataReceiver` can store a reference to the ByteString.Output 
locally. Finally, this can solve our concurrency issue as well since we can 
return two variants of this FnDataReceiver, one that synchronizes on a lock 
object and one that doesn't allowing for the flush method to conditionally lock 
as well if there is a flushing thread.
   
   We can have a similar map for timers keyed by `ptransformId` and 
`timerFamilyId` (e.g. `Map<KV<String, String>, ByteString.Output>`).

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An outbound data buffering aggregator with size-based buffer and time-based 
buffer if
+ * corresponding options are set.
+ *
+ * <p>The default size-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_size_limit=<bytes>}
+ *
+ * <p>The default time-based buffer threshold can be overridden by specifying 
the experiment {@code
+ * data_buffer_time_limit_ms=<milliseconds>}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BeamFnDataOutboundAggregator {
+
+  public static final String DATA_BUFFER_SIZE_LIMIT = 
"data_buffer_size_limit=";
+  public static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  public static final String DATA_BUFFER_TIME_LIMIT_MS = 
"data_buffer_time_limit_ms=";
+  public static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
+  private final int sizeLimit;
+  private final long timeLimit;
+  private final Supplier<String> processBundleRequestIdSupplier;
+  private final Map<LogicalEndpoint, Coder<?>> outputLocations;
+  private final Map<LogicalEndpoint, Long> perEndpointByteCount;
+  private final Map<LogicalEndpoint, Long> perEndpointElementCount;
+  private final StreamObserver<Elements> outboundObserver;
+  private final Map<LogicalEndpoint, ByteString.Output> buffer;
+  @VisibleForTesting ScheduledFuture<?> flushFuture;
+  private final AtomicLong byteCounter = new AtomicLong(0L);
+
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options,
+      Supplier<String> processBundleRequestIdSupplier,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    this.sizeLimit = getSizeLimit(options);
+    this.timeLimit = getTimeLimit(options);
+    this.outputLocations = new HashMap<>();
+    this.perEndpointByteCount = new HashMap<>();
+    this.perEndpointElementCount = new HashMap<>();
+    this.outboundObserver = outboundObserver;
+    this.buffer = new ConcurrentHashMap<>();
+    if (timeLimit > 0) {
+      this.flushFuture =
+          Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("DataBufferOutboundFlusher-thread")
+                      .build())
+              .scheduleAtFixedRate(
+                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+    } else {
+      this.flushFuture = null;
+    }
+    this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
+  }
+
+  /**
+   * Register the outbound data logical endpoint for the transform alongside 
it's output value
+   * coder. All registered endpoints will eventually be removed when {@link
+   * #sendBufferedDataAndFinishOutboundStreams()} is called at the end of a 
bundle.
+   */
+  public <T> void registerOutputDataLocation(String pTransformId, Coder<T> 
coder) {
+    registerOutputLocation(
+        LogicalEndpoint.data(processBundleRequestIdSupplier.get(), 
pTransformId), coder);
+  }
+
+  public <T> void registerOutputTimersLocation(
+      String pTransformId, String timerFamilyId, Coder<T> coder) {
+    registerOutputLocation(
+        LogicalEndpoint.timer(processBundleRequestIdSupplier.get(), 
pTransformId, timerFamilyId),
+        coder);
+  }
+
+  public <T> void registerOutputLocation(LogicalEndpoint endpoint, Coder<T> 
coder) {
+    LOG.debug("Registering endpoint: {}", endpoint);
+    outputLocations.put(endpoint, coder);
+  }
+
+  public synchronized void flush() throws IOException {
+    Elements.Builder elements = convertBufferForTransmission();
+    if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
+      outboundObserver.onNext(elements.build());
+    }
+  }
+
+  /**
+   * Closes the streams for all registered outbound endpoints. Should be 
called at the end of each
+   * bundle.
+   */
+  public void sendBufferedDataAndFinishOutboundStreams() {
+    LOG.debug("Closing streams for outbound endpoints {}", outputLocations);
+    LOG.debug("Sent outbound data size : {}", perEndpointByteCount);
+    LOG.debug("Sent outbound element count : {}", perEndpointElementCount);
+    if (byteCounter.get() == 0 && outputLocations.isEmpty()) {
+      return;
+    }
+    Elements.Builder bufferedElements = convertBufferForTransmission();
+    for (LogicalEndpoint outputLocation : outputLocations.keySet()) {
+      if (outputLocation.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setTimerFamilyId(outputLocation.getTimerFamilyId())
+            .setIsLast(true);
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(outputLocation.getInstructionId())
+            .setTransformId(outputLocation.getTransformId())
+            .setIsLast(true);
+      }
+    }
+    outboundObserver.onNext(bufferedElements.build());
+    outputLocations.clear();
+    perEndpointByteCount.clear();
+    perEndpointElementCount.clear();
+  }
+
+  public <T> void acceptData(String pTransformId, T data) throws Exception {
+    accept(LogicalEndpoint.data(processBundleRequestIdSupplier.get(), 
pTransformId), data);
+  }
+
+  public <T> void acceptTimers(String pTransformId, String timerFamilyId, T 
timers)
+      throws Exception {
+    accept(
+        LogicalEndpoint.timer(processBundleRequestIdSupplier.get(), 
pTransformId, timerFamilyId),
+        timers);
+  }
+
+  public <T> void accept(LogicalEndpoint endpoint, T data) throws Exception {
+    if (timeLimit > 0) {
+      checkFlushThreadException();
+    }
+    buffer.compute(
+        endpoint,
+        (e, output) -> {
+          if (output == null) {
+            output = ByteString.newOutput();
+          }
+          int size = output.size();
+          try {
+            ((Coder<T>) outputLocations.get(e)).encode(data, output);
+          } catch (IOException ex) {
+            throw new RuntimeException("Failed to encode data.");
+          }
+          if (output.size() - size == 0) {
+            output.write(0);
+          }
+          final long delta = (long) output.size() - size;
+          byteCounter.getAndAdd(delta);
+          perEndpointByteCount.compute(
+              e, (e1, byteCount) -> byteCount == null ? delta : delta + 
byteCount);
+          return output;
+        });
+
+    if (byteCounter.get() >= sizeLimit) {
+      flush();
+    }
+  }
+
+  private Elements.Builder convertBufferForTransmission() {
+    Elements.Builder bufferedElements = Elements.newBuilder();
+    for (LogicalEndpoint endpoint : buffer.keySet()) {
+      ByteString.Output output = buffer.remove(endpoint);
+      if (output == null) {
+        continue;
+      }
+      byteCounter.getAndAdd(-1 * output.size());
+      if (endpoint.isTimer()) {
+        bufferedElements
+            .addTimersBuilder()
+            .setInstructionId(endpoint.getInstructionId())
+            .setTransformId(endpoint.getTransformId())
+            .setTimerFamilyId(endpoint.getTimerFamilyId())
+            .setTimers(output.toByteString());
+      } else {
+        bufferedElements
+            .addDataBuilder()
+            .setInstructionId(endpoint.getInstructionId())
+            .setTransformId(endpoint.getTransformId())
+            .setData(output.toByteString());
+      }
+      perEndpointElementCount.compute(endpoint, (e, count) -> count == null ? 
1 : count + 1);
+      output.reset();

Review comment:
       Since you removed the output there is no point in resetting it since the 
next `accept` call will add it again.
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to