shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492762927


##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> 
handle(StateRequest.Builder requestBuild
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
+      case ORDERED_LIST_GET:
+        {
+          long start = request.getOrderedListGet().getRange().getStart();
+          long end = request.getOrderedListGet().getRange().getEnd();
+
+          KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+          long sortKey = start;
+          int index = 0;
+          if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+            try {
+              // The continuation format here is the sort key (long) followed 
by an index (int)
+              KV<Long, Integer> cursor =
+                  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+              sortKey = cursor.getKey();
+              index = cursor.getValue();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          ByteString continuationToken;
+          ByteString returnBlock = ByteString.EMPTY;
+          ;
+          try {
+            if (sortKey < start || sortKey >= end) {
+              throw new IndexOutOfBoundsException("sort key out of range");

Review Comment:
   See my comment in a previous thread: 
https://github.com/apache/beam/pull/30317#discussion_r1492631642



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to