kennknowles commented on code in PR #33212:
URL: https://github.com/apache/beam/pull/33212#discussion_r1894416064


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java:
##########
@@ -622,4 +627,82 @@ public Boolean read() {
       };
     }
   }
+
+  private final class SparkOrderedListState<T> extends 
AbstractState<List<TimestampedValue<T>>>
+      implements OrderedListState<T> {
+
+    private SparkOrderedListState(StateNamespace namespace, String id, 
Coder<T> coder) {
+      super(namespace, id, 
ListCoder.of(TimestampedValue.TimestampedValueCoder.of(coder)));
+    }
+
+    private SortedMap<Instant, TimestampedValue<T>> readAsMap() {
+      final List<TimestampedValue<T>> listValues =

Review Comment:
   I see. If Flink is implemented then it is OK with me to follow that 
precedent. My point was that this does not actually add capability that is more 
than `ValueState` provides. It is just a minor API wrapper adjustment - still 
useful but not the main purpose.
   
   So we can merge with this design. But if you think about following up, here 
is how we would really like this to behave:
   
    - `add` should call some native Spark API that writes the element without 
reading the list
    - `readRange` should only read the requested range, ideally seeking in 
near-constant time (aka without a scan or sort)
    - `clearRange` should also seek in near-constant time
    - `isEmpty` should not read the list
   



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java:
##########
@@ -571,6 +573,27 @@ public void clear() {
         throw new RuntimeException("Error clearing state.", e);
       }
     }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      final FlinkOrderedListState<?> that = (FlinkOrderedListState<?>) o;
+
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();

Review Comment:
   Overriding equality and hashcode should always occur together, so it is 
necessary to override `hashCode`.
   
   I was just suggesting this pattern instead of doing your own math: 
https://github.com/apache/beam/blob/7c86bf3103fec1d3a6a3a3cbe6ace8a14a85b723/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SuccessOrFailure.java#L88



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to