scwhittle commented on code in PR #36743:
URL: https://github.com/apache/beam/pull/36743#discussion_r2502211647
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java:
##########
@@ -65,20 +68,49 @@ private CachingStateTable(Builder builder) {
}
}
- static CachingStateTable.Builder builder(
+ static Builder builder(
String stateFamily,
WindmillStateReader reader,
ForKeyAndFamily cache,
boolean isNewKey,
Supplier<Closeable> scopedReadStateSupplier,
WindmillStateTagUtil windmillStateTagUtil) {
- return new CachingStateTable.Builder(
+ return new Builder(
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey,
windmillStateTagUtil);
}
- @Override
+ /**
+ * Gets the {@link State} in the specified {@link StateNamespace} with the
specified {@link
+ * StateTag}, binding it using the {@link #binderForNamespace} if it is not
already present in
+ * this {@link CachingStateTable}.
+ */
+ public <StateT extends State> StateT get(
+ StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
+
+ StateTableKey stateTableKey = StateTableKey.create(namespace, tag);
+ @org.checkerframework.checker.nullness.qual.Nullable
Review Comment:
use the javax annotation?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java:
##########
@@ -82,7 +81,8 @@ public WindmillStateInternals(
return key;
}
- private void persist(List<Future<WorkItemCommitRequest>> commitsToMerge,
StateTable stateTable) {
+ private void persist(
+ List<Future<WorkItemCommitRequest>> commitsToMerge, CachingStateTable
stateTable) {
for (State location : stateTable.values()) {
if (!(location instanceof WindmillState)) {
Review Comment:
since we have our own class now I think you could enforce this with typing
instead of having to cast here
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java:
##########
@@ -65,20 +68,49 @@ private CachingStateTable(Builder builder) {
}
}
- static CachingStateTable.Builder builder(
+ static Builder builder(
String stateFamily,
WindmillStateReader reader,
ForKeyAndFamily cache,
boolean isNewKey,
Supplier<Closeable> scopedReadStateSupplier,
WindmillStateTagUtil windmillStateTagUtil) {
- return new CachingStateTable.Builder(
+ return new Builder(
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey,
windmillStateTagUtil);
}
- @Override
+ /**
+ * Gets the {@link State} in the specified {@link StateNamespace} with the
specified {@link
+ * StateTag}, binding it using the {@link #binderForNamespace} if it is not
already present in
+ * this {@link CachingStateTable}.
+ */
+ public <StateT extends State> StateT get(
+ StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
+
+ StateTableKey stateTableKey = StateTableKey.create(namespace, tag);
+ @org.checkerframework.checker.nullness.qual.Nullable
+ State storage = stateTable.get(stateTableKey);
Review Comment:
how about using computeIfAbsent?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java:
##########
@@ -239,6 +271,18 @@ private <T extends State> StateTag<T>
addressOrInternalTag(StateTag<T> address)
};
}
+ @AutoValue
+ abstract static class StateTableKey {
+
+ public abstract StateNamespace getStateNameSpace();
+
+ public abstract String getId();
Review Comment:
add a TODO that this matches StateTable behavior but that the stateTag
prefix should be included to avoid possible collisions?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java:
##########
@@ -239,6 +271,18 @@ private <T extends State> StateTag<T>
addressOrInternalTag(StateTag<T> address)
};
}
+ @AutoValue
+ abstract static class StateTableKey {
+
+ public abstract StateNamespace getStateNameSpace();
Review Comment:
nit: getStateNamespace
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]