robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1491730995


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1021,6 +1033,29 @@ message StateKey {
     bytes map_key = 5;
   }
 
+  // Represents a request for an ordered list of values associated with a
+  // specified user key and window for a PTransform. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+  // details.
+  //
+  // The response data stream will be a concatenation of all entries of sort 
key
+  // and V's associated with the specified user key and window.
+  // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+  // details.
+  message OrderedListUserState {
+    // (Required) The id of the PTransform containing user state.
+    string transform_id = 1;
+    // (Required) The id of the user state.
+    string user_state_id = 2;
+    // (Required) The window encoded in a nested context.
+    bytes window = 3;
+    // (Required) The key of the currently executing element encoded in a
+    // nested context.
+    bytes key = 4;
+    // (optional) The sort key encoded in a nested context.
+    int64 sort_key = 5;

Review Comment:
   What is the semantics of providing a sort_key? Should we allow a range 
instead?



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> 
handle(StateRequest.Builder requestBuild
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
+      case ORDERED_LIST_GET:
+        {
+          long start = request.getOrderedListGet().getRange().getStart();
+          long end = request.getOrderedListGet().getRange().getEnd();
+
+          KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+          long sortKey = start;
+          int index = 0;
+          if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+            try {
+              // The continuation format here is the sort key (long) followed 
by an index (int)
+              KV<Long, Integer> cursor =
+                  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+              sortKey = cursor.getKey();
+              index = cursor.getValue();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          ByteString continuationToken;
+          ByteString returnBlock = ByteString.EMPTY;
+          ;
+          try {
+            if (sortKey < start || sortKey >= end) {
+              throw new IndexOutOfBoundsException("sort key out of range");
+            }
+
+            NavigableSet<Long> subset =
+                orderedListKeys
+                    .getOrDefault(request.getStateKey(), new TreeSet<>())
+                    .subSet(sortKey, true, end, false);
+
+            // get the effective sort key currently, can throw 
NoSuchElementException
+            Long nextSortKey = subset.first();
+
+            StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+            
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+            List<ByteString> byteStrings =
+                data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+            // get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+            returnBlock = byteStrings.get(index);
+
+            if (byteStrings.size() > index + 1) {
+              // more blocks from this sort key
+              index += 1;
+            } else {
+              // finish navigating the current sort key and need to find the 
next one,
+              // can throw NoSuchElementException
+              nextSortKey = subset.tailSet(nextSortKey, false).first();
+              index = 0;
+            }
+
+            ByteStringOutputStream outputStream = new ByteStringOutputStream();
+            try {
+              KV<Long, Integer> cursor = KV.of(nextSortKey, index);
+              coder.encode(cursor, outputStream);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+            continuationToken = outputStream.toByteString();
+          } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+            continuationToken = ByteString.EMPTY;
+          }
+          response =
+              StateResponse.newBuilder()
+                  .setOrderedListGet(
+                      OrderedListStateGetResponse.newBuilder()
+                          .setData(returnBlock)
+                          .setContinuationToken(continuationToken));
+        }
+        break;
+
+      case ORDERED_LIST_UPDATE:
+        for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+          List<Long> keysToRemove =
+              new ArrayList<>(
+                  orderedListKeys
+                      .getOrDefault(request.getStateKey(), new TreeSet<>())
+                      .subSet(r.getStart(), true, r.getEnd(), false));
+
+          for (Long l : keysToRemove) {
+            StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+            keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+            data.remove(keyBuilder.build());
+            orderedListKeys.get(request.getStateKey()).remove(l);
+          }
+        }
+
+        for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+          StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+          
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+          ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+          try {
+            InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+          } catch (IOException ex) {
+            throw new RuntimeException(ex);
+          }
+          // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   Where is this documented? Is timestamp being used here as another term for 
sort key?



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Should we instead split this up into two separate requests, e.g. 
OrderedListStateInsertRequest and OrderedListStateDeleteRequest?



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   For symmetry, its seems we should be re-using the Get/Append/Clear messages 
just like with all the other states, and stick a range in the state key (for 
get and clear at least; for append it'd be the sort keys provided in the data). 
Otherwise ordered list state is structured differently from all the rest.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##########
@@ -169,6 +172,7 @@ public ResultT get() {
   }
 
   @Override
+  @SuppressWarnings("unchecked")

Review Comment:
   Why did all of these get added?



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.
+  repeated OrderedListRange deletes = 1;
+  repeated OrderedListEntry inserts = 2;

Review Comment:
   Should there be symmetry between the written and read data? E.g. one writes 
bytes that are encoded KV<sort_key, data> and then reads them?



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> 
handle(StateRequest.Builder requestBuild
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
+      case ORDERED_LIST_GET:
+        {
+          long start = request.getOrderedListGet().getRange().getStart();
+          long end = request.getOrderedListGet().getRange().getEnd();
+
+          KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+          long sortKey = start;
+          int index = 0;
+          if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+            try {
+              // The continuation format here is the sort key (long) followed 
by an index (int)
+              KV<Long, Integer> cursor =
+                  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+              sortKey = cursor.getKey();
+              index = cursor.getValue();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          ByteString continuationToken;
+          ByteString returnBlock = ByteString.EMPTY;
+          ;

Review Comment:
   stray semicolon



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> 
handle(StateRequest.Builder requestBuild
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
+      case ORDERED_LIST_GET:
+        {
+          long start = request.getOrderedListGet().getRange().getStart();
+          long end = request.getOrderedListGet().getRange().getEnd();
+
+          KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+          long sortKey = start;
+          int index = 0;
+          if (request.getOrderedListGet().getContinuationToken().size() > 0) {

Review Comment:
   Use isEmpty?



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   For efficiency reasons we have generally not split individual elements up 
into individual field protos, and provided them as contiguous bytes of data. 
Makes sense to do that here. But the coding should be specified (e.g. is this a 
concatenation of several encoded KV<sort-key, value>s? 



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##########
@@ -174,6 +215,124 @@ public CompletableFuture<StateResponse> 
handle(StateRequest.Builder requestBuild
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
+      case ORDERED_LIST_GET:
+        {
+          long start = request.getOrderedListGet().getRange().getStart();
+          long end = request.getOrderedListGet().getRange().getEnd();
+
+          KvCoder<Long, Integer> coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+          long sortKey = start;
+          int index = 0;
+          if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+            try {
+              // The continuation format here is the sort key (long) followed 
by an index (int)
+              KV<Long, Integer> cursor =
+                  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+              sortKey = cursor.getKey();
+              index = cursor.getValue();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          ByteString continuationToken;
+          ByteString returnBlock = ByteString.EMPTY;
+          ;
+          try {
+            if (sortKey < start || sortKey >= end) {
+              throw new IndexOutOfBoundsException("sort key out of range");

Review Comment:
   OK, this goes back to the idea that range and (opaque) continuation token 
should be mutually exclusive.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   We should also consider if there are considerations in the encoding of 
sort-key. If they're (typically) timestamps, bigendian might be preferable to 
varint. 



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1075,6 +1111,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   Do we want to require returning the range if there's a continuation token 
involved? Or should they be mutually exclusive? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to