[
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=685559&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-685559
]
ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Nov/21 00:40
Start Date: 24/Nov/21 00:40
Worklog Time Spent: 10m
Work Description: y1chi commented on a change in pull request #16057:
URL: https://github.com/apache/beam/pull/16057#discussion_r755611072
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
##########
@@ -265,48 +258,39 @@ public void prefetch() {
@Override
public boolean hasNext() {
- switch (currentState) {
- case EOF:
- return false;
- case READ_REQUIRED:
- prefetch();
- StateResponse stateResponse;
- try {
- stateResponse = prefetchedResponse.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
- } catch (ExecutionException e) {
- if (e.getCause() == null) {
- throw new IllegalStateException(e);
- }
- Throwables.throwIfUnchecked(e.getCause());
- throw new IllegalStateException(e.getCause());
- }
- prefetchedResponse = null;
- continuationToken = stateResponse.getGet().getContinuationToken();
- next = stateResponse.getGet().getData();
- currentState = State.HAS_NEXT;
- return true;
- case HAS_NEXT:
- return true;
- }
- throw new IllegalStateException(String.format("Unknown state %s",
currentState));
+ return moreToRead;
}
@Override
public ByteString next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
+
+ prefetch();
+ StateResponse stateResponse;
+ try {
+ stateResponse = prefetchedResponse.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() == null) {
+ throw new IllegalStateException(e);
+ }
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new IllegalStateException(e.getCause());
+ }
+ prefetchedResponse = null;
+ continuationToken = stateResponse.getGet().getContinuationToken();
+
// If the continuation token is empty, that means we have reached EOF.
if (ByteString.EMPTY.equals(continuationToken)) {
- currentState = State.EOF;
+ moreToRead = false;
} else {
- currentState = State.READ_REQUIRED;
prefetch();
Review comment:
do we still need this prefetch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 685559)
Time Spent: 13.5h (was: 13h 20m)
> Optimize Java SDK harness
> -------------------------
>
> Key: BEAM-13015
> URL: https://issues.apache.org/jira/browse/BEAM-13015
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 13.5h
> Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)