scwhittle commented on code in PR #26370:
URL: https://github.com/apache/beam/pull/26370#discussion_r1182792600
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java:
##########
@@ -130,13 +132,31 @@ public void asyncClose() throws Exception {
if (!newValues.isEmpty()) {
ByteStringOutputStream out = new ByteStringOutputStream();
for (T newValue : newValues) {
- // TODO: Replace with chunking output stream
+ int previousSize = out.size();
valueCoder.encode(newValue, out);
+ if (out.size() > BAG_APPEND_BATCHING_LIMIT && previousSize > 0) {
+ // Respect the limit by outputting the previous batch of elements.
+ beamFnStateClient.handle(
+ request
+ .toBuilder()
+ .setAppend(
+ StateAppendRequest.newBuilder()
+
.setData(out.consumePrefixToByteString(previousSize))));
+ }
+ if (out.size() > BAG_APPEND_BATCHING_LIMIT) {
Review Comment:
Can you clarify? Here we don't have the encoded sizes until we actually
encode, and for efficiency it is nice to encode with ByteStringOutputStream
instead of creating a separate output stream for each value and concatenating
to respect the limit. Pulling off the prefix seemed like an ok solution to
keep the performance of ByteStringOutputStream. Were you thinking the
performance hit of separate buffers was worth the simpler code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]