kennknowles commented on code in PR #33212:
URL: https://github.com/apache/beam/pull/33212#discussion_r1892115490
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java:
##########
@@ -571,6 +573,27 @@ public void clear() {
throw new RuntimeException("Error clearing state.", e);
}
}
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final FlinkOrderedListState<?> that = (FlinkOrderedListState<?>) o;
+
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
Review Comment:
Can use `Objects.hashCode`
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java:
##########
@@ -571,6 +573,27 @@ public void clear() {
throw new RuntimeException("Error clearing state.", e);
}
}
+
+ @Override
Review Comment:
This change is nice to have. It should be in a separate commit. Can you
please adjust the commit history so that each independent change is in a
clearly labeled commit, and I will not squash them. In other words, merge the
spotless chore and other non-meaningful commits into the other commits so that
the commits in the PR are each meaningful on their own.
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java:
##########
@@ -622,4 +627,82 @@ public Boolean read() {
};
}
}
+
+ private final class SparkOrderedListState<T> extends
AbstractState<List<TimestampedValue<T>>>
+ implements OrderedListState<T> {
+
+ private SparkOrderedListState(StateNamespace namespace, String id,
Coder<T> coder) {
+ super(namespace, id,
ListCoder.of(TimestampedValue.TimestampedValueCoder.of(coder)));
+ }
+
+ private SortedMap<Instant, TimestampedValue<T>> readAsMap() {
+ final List<TimestampedValue<T>> listValues =
Review Comment:
The reason for each additional kind of state is to efficiently offer a novel
form of a state access. The state access here as the same performance
characteristics as `ValueState`. It is actually better for the runner to reject
a pipeline than to run it with performance characteristics that don't match the
expected performance contract.
Is there some underlying mechanism in Spark that could implement
OrderedListState efficiently and scalably?
##########
CHANGES.md:
##########
@@ -71,11 +71,15 @@
## New Features / Improvements
+* Improved batch performance of SparkRunner's GroupByKey
([#20943](https://github.com/apache/beam/pull/20943)).
+* Support OnWindowExpiration in Prism
([#32211](https://github.com/apache/beam/issues/32211)).
Review Comment:
Some of the changes listed here don't seem to be part of this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]