shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492924566


##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> 
handle(StateRequest.Builder requestBuild
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
+      case ORDERED_LIST_GET:
+        {
+          long start = request.getOrderedListGet().getRange().getStart();
+          long end = request.getOrderedListGet().getRange().getEnd();
+
+          KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+          long sortKey = start;
+          int index = 0;
+          if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+            try {
+              // The continuation format here is the sort key (long) followed 
by an index (int)
+              KV<Long, Integer> cursor =
+                  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+              sortKey = cursor.getKey();
+              index = cursor.getValue();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          ByteString continuationToken;
+          ByteString returnBlock = ByteString.EMPTY;
+          ;
+          try {
+            if (sortKey < start || sortKey >= end) {
+              throw new IndexOutOfBoundsException("sort key out of range");
+            }
+
+            NavigableSet<Long> subset =
+                orderedListKeys
+                    .getOrDefault(request.getStateKey(), new TreeSet<>())
+                    .subSet(sortKey, true, end, false);
+
+            // get the effective sort key currently, can throw 
NoSuchElementException
+            Long nextSortKey = subset.first();
+
+            StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+            
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+            List<ByteString> byteStrings =
+                data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+            // get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+            returnBlock = byteStrings.get(index);
+
+            if (byteStrings.size() > index + 1) {
+              // more blocks from this sort key
+              index += 1;
+            } else {
+              // finish navigating the current sort key and need to find the 
next one,
+              // can throw NoSuchElementException
+              nextSortKey = subset.tailSet(nextSortKey, false).first();
+              index = 0;
+            }
+
+            ByteStringOutputStream outputStream = new ByteStringOutputStream();
+            try {
+              KV<Long, Integer> cursor = KV.of(nextSortKey, index);
+              coder.encode(cursor, outputStream);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+            continuationToken = outputStream.toByteString();
+          } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+            continuationToken = ByteString.EMPTY;
+          }
+          response =
+              StateResponse.newBuilder()
+                  .setOrderedListGet(
+                      OrderedListStateGetResponse.newBuilder()
+                          .setData(returnBlock)
+                          .setContinuationToken(continuationToken));
+        }
+        break;
+
+      case ORDERED_LIST_UPDATE:
+        for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+          List<Long> keysToRemove =
+              new ArrayList<>(
+                  orderedListKeys
+                      .getOrDefault(request.getStateKey(), new TreeSet<>())
+                      .subSet(r.getStart(), true, r.getEnd(), false));
+
+          for (Long l : keysToRemove) {
+            StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+            keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+            data.remove(keyBuilder.build());
+            orderedListKeys.get(request.getStateKey()).remove(l);
+          }
+        }
+
+        for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+          StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+          
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+          ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+          try {
+            InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+          } catch (IOException ex) {
+            throw new RuntimeException(ex);
+          }
+          // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   Timestamp is the sort key here. As our OrderedListState interface is based 
on TimestampedValue<T>, the sort key is actually an Instant, but I agree that I 
should be consistent here by using "sort key".
   
   The reason why I put the value encoded bytes before the sort key encoded 
bytes is that the same order is used in the coder of TimestampedValue<T>. 
https://github.com/apache/beam/blob/0d46e304f176847e897eef244c50cb29af8c6451/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java#L110
   
   Here, we pack the data into the output stream as if we are calling 
`TimestampedValue.encode()`. However, we cannot call encode() directly, because 
we don't want to decode the value and encode it again here.
   
   (In my previous attempt, I did try to decode the value and encode it again. 
It seems to introduce unnecessary overhead. 
https://github.com/apache/beam/pull/30317/commits/d06e0a5ebe8405334112a2d797bb5c9571b46b9c)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to