shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492822517
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse>
handle(StateRequest.Builder requestBuild
response =
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
break;
+ case ORDERED_LIST_GET:
+ {
+ long start = request.getOrderedListGet().getRange().getStart();
+ long end = request.getOrderedListGet().getRange().getEnd();
+
+ KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(),
VarIntCoder.of());
+ long sortKey = start;
+ int index = 0;
+ if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+ try {
+ // The continuation format here is the sort key (long) followed
by an index (int)
+ KV<Long, Integer> cursor =
+
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+ sortKey = cursor.getKey();
+ index = cursor.getValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ ByteString continuationToken;
+ ByteString returnBlock = ByteString.EMPTY;
+ ;
+ try {
+ if (sortKey < start || sortKey >= end) {
+ throw new IndexOutOfBoundsException("sort key out of range");
Review Comment:
See my comment in
https://github.com/apache/beam/pull/30317#discussion_r1492631642
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]