scwhittle commented on code in PR #26370:
URL: https://github.com/apache/beam/pull/26370#discussion_r1182329325
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java:
##########
@@ -130,13 +132,31 @@ public void asyncClose() throws Exception {
if (!newValues.isEmpty()) {
ByteStringOutputStream out = new ByteStringOutputStream();
for (T newValue : newValues) {
- // TODO: Replace with chunking output stream
+ int previousSize = out.size();
valueCoder.encode(newValue, out);
+ if (out.size() > BAG_APPEND_BATCHING_LIMIT && previousSize > 0) {
+ // Respect the limit by outputting the previous batch of elements.
+ beamFnStateClient.handle(
+ request
+ .toBuilder()
+ .setAppend(
+ StateAppendRequest.newBuilder()
+
.setData(out.consumePrefixToByteString(previousSize))));
+ }
+ if (out.size() > BAG_APPEND_BATCHING_LIMIT) {
Review Comment:
The 10MB is a soft-limit as that seems large enough to get efficiency wins
of batching over rpcs while keeping things more granular for paging in later.
But for the dataflow streaming backend there is a hard-limit of 100MB.
Incoming elements can be up to that limit, so if those elements are directly
written to a bag I want to ensure we don't batch beyond that.
I will add some comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]