lukecwik commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801863810
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -157,15 +161,18 @@ public void flush() throws IOException {
if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
Review comment:
Lets make flush private and/or package private marked with
`@VisibleForTesting` if used in tests.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -555,6 +575,38 @@ public BundleFinalizer getBundleFinalizer() {
}
}
+ private void embedOutboundElementsIfApplicable(
+ ProcessBundleResponse.Builder response, BundleProcessor bundleProcessor)
{
+ List<Elements> collectedElements =
+ new ArrayList<>(bundleProcessor.getOutboundAggregators().size());
+ boolean hasFlushedAggregator = false;
+ for (BeamFnDataOutboundAggregator aggregator :
Review comment:
Add a comment by the instantiation of the outbound aggregator map that
your using a LinkedHashMap since you rely on stable iteration order.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -555,6 +575,38 @@ public BundleFinalizer getBundleFinalizer() {
}
}
+ private void embedOutboundElementsIfApplicable(
+ ProcessBundleResponse.Builder response, BundleProcessor bundleProcessor)
{
+ List<Elements> collectedElements =
+ new ArrayList<>(bundleProcessor.getOutboundAggregators().size());
+ boolean hasFlushedAggregator = false;
+ for (BeamFnDataOutboundAggregator aggregator :
+ bundleProcessor.getOutboundAggregators().values()) {
+ Elements elements =
aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
+ if (elements == null) {
+ hasFlushedAggregator = true;
+ }
+ collectedElements.add(elements);
+ }
+ if (!hasFlushedAggregator && !collectedElements.isEmpty()) {
+ Elements.Builder elementsToEmbed = Elements.newBuilder();
+ for (Elements collectedElement : collectedElements) {
+ elementsToEmbed.mergeFrom(collectedElement);
+ }
+ response.setElements(elementsToEmbed.build());
+ } else {
+ // If there's flushed aggregator, flush all other aggregators as well.
Review comment:
```suggestion
// Since there was at least one flushed aggregator, we have to use the
aggregators that were able to successfully collect their elements to emit them
and can not send them as part of the ProcessBundleResponse.
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -555,6 +575,38 @@ public BundleFinalizer getBundleFinalizer() {
}
}
+ private void embedOutboundElementsIfApplicable(
+ ProcessBundleResponse.Builder response, BundleProcessor bundleProcessor)
{
+ List<Elements> collectedElements =
+ new ArrayList<>(bundleProcessor.getOutboundAggregators().size());
+ boolean hasFlushedAggregator = false;
+ for (BeamFnDataOutboundAggregator aggregator :
+ bundleProcessor.getOutboundAggregators().values()) {
+ Elements elements =
aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
+ if (elements == null) {
+ hasFlushedAggregator = true;
+ }
+ collectedElements.add(elements);
+ }
+ if (!hasFlushedAggregator && !collectedElements.isEmpty()) {
Review comment:
Move the `!collectedElements.isEmpty()` logic to the start of this
method with:
`if (bundleProcessor.getOutboundAggregators().isEmpty()) { return; }`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]