gemini-code-assist[bot] commented on code in PR #37691:
URL: https://github.com/apache/beam/pull/37691#discussion_r2845463297
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -438,7 +445,7 @@ private void startStateApiWrites() {
}
// Persist pending key-values
- if (!pendingAdds.isEmpty()) {
+ if (!pendingAdds.isEmpty() && !onlyBundleForKeys) {
Review Comment:

The `StateClearRequest`s sent when `isCleared` is true or `pendingRemoves`
is not empty are not guarded by `!onlyBundleForKeys`. To be consistent with
skipping appends, should these clear operations also be skipped when
`onlyBundleForKeys` is true?
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##########
@@ -303,6 +309,11 @@ public void asyncClose() throws Exception {
pendingRemoves.clear();
}
+ if (onlyBundleForKeys) {
+ pendingAdds.clear();
+ return;
+ }
Review Comment:

Similar to my other comments, the logic for `pendingRemoves` which results
in `StateClearRequest`s is not guarded by `onlyBundleForKeys`. To be
consistent, we should probably skip sending those requests as well when
`onlyBundleForKeys` is true. The current implementation only skips
`pendingAdds`.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java:
##########
@@ -135,7 +141,7 @@ public void asyncClose() throws Exception {
for (T newValue : newValues) {
int previousSize = out.size();
valueCoder.encode(newValue, out);
- if (out.size() > BAG_APPEND_BATCHING_LIMIT && previousSize > 0) {
+ if (out.size() > BAG_APPEND_BATCHING_LIMIT && previousSize > 0 &&
!onlyBundleForKeys) {
Review Comment:

There's a small performance improvement opportunity here. When
`onlyBundleForKeys` is true, the code still iterates over `newValues` and
encodes them, even though they are never sent to the runner.
To avoid this unnecessary work, you could wrap the entire block that
processes `newValues` in an `if (!onlyBundleForKeys)` check. This is similar to
the pattern used in `OrderedListUserState.java`.
For example:
```java
if (!newValues.isEmpty() && !onlyBundleForKeys) {
// Batch values up to a arbitrary limit ...
ByteStringOutputStream out = new ByteStringOutputStream();
for (T newValue : newValues) {
// ... existing encoding and sending logic ...
}
if (out.size() > 0) {
// ... existing sending logic ...
}
}
```
This would prevent the encoding from happening when the data isn't going to
be sent.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java:
##########
@@ -135,7 +141,7 @@ public void asyncClose() throws Exception {
for (T newValue : newValues) {
int previousSize = out.size();
valueCoder.encode(newValue, out);
- if (out.size() > BAG_APPEND_BATCHING_LIMIT && previousSize > 0) {
+ if (out.size() > BAG_APPEND_BATCHING_LIMIT && previousSize > 0 &&
!onlyBundleForKeys) {
Review Comment:

While you've correctly guarded the `StateAppendRequest`s with
`!onlyBundleForKeys`, it seems the `StateClearRequest` is not. If
`onlyBundleForKeys` is true, we should probably avoid sending any state
modifications to the runner, including clears.
The `clear` operation at line 133 seems to be missing a check for
`onlyBundleForKeys`:
```java
if (isCleared) { // This should probably be if (isCleared &&
!onlyBundleForKeys)
beamFnStateClient.handle(
request.toBuilder().setClear(StateClearRequest.getDefaultInstance()));
}
```
This would make the behavior consistent with not sending appends.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]