lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r688068062
##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,15 +731,39 @@ message StateKey {
bytes key = 4;
}
+ // Represents a request for the keys of a multimap associated with a
specified
+ // user key and window in a PCollection. See
+ // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+ // details.
+ //
+ // Can only be used to perform StateGetRequests and StateClearRequests on the
+ // user state.
+ //
+ // For a PCollection<KV<K, V>>, the response data stream will be a
+ // concatenation of all K's associated with the specified user key and
window.
+ // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+ // details.
message MultimapKeysUserState {
// (Required) The id of the PTransform containing user state.
string transform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
+ // (Required) The key of the currently executing element encoded in a
+ // nested context.
+ bytes key = 4;
}
+ // Represents a request for the values of the lookup key associated with a
+ // specified user key and window in a PCollection. See
+ // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
Review comment:
Ditto for not being a PCollection but data that is associated with the
transform itself.
See https://beam.apache.org/blog/stateful-processing/
##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -740,6 +774,8 @@ message StateKey {
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
+ // (Required) The encoded look up key for the map.
+ bytes map_key = 5;
Review comment:
```suggestion
// (Required) The encoded lookup key for the map encoded in a nested
context.
bytes map_key = 5;
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec,
Coder<T> elemCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (SetState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new SetState<T>() {
+ private final MultimapUserState<T, Boolean> impl =
+ createMultimapUserState(id, elemCoder,
BooleanCoder.of());
+
+ @Override
+ public void clear() {
+ impl.clear();
+ }
+
+ @Override
+ public ReadableState<Boolean> contains(T t) {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return !Iterables.isEmpty(impl.get(t));
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Boolean> addIfAbsent(T t) {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
Review comment:
The WindmillStateInternals does add it to the windmillmap as a side
effect of the construction of the returned ReadableState
[here](https://github.com/apache/beam/blob/bd9e4d3f63d06f8156203173ab643530ea16ee52/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java#L1154)
so it doesn't require the user to call read to have it added. Calling read
just tells the user whether it was added or not.
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2172,22 +2170,6 @@ static void verifyDoFnSupported(DoFn<?, ?> fn, boolean
streaming, boolean stream
"%s does not currently support @RequiresTimeSortedInput in
streaming mode.",
DataflowRunner.class.getSimpleName()));
}
- if (DoFnSignatures.usesSetState(fn)) {
Review comment:
Yes unless you plan to add support to the current production Java worker.
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2236,13 +2238,33 @@ static boolean
useStreamingEngine(DataflowPipelineOptions options) {
|| hasExperiment(options, GcpOptions.WINDMILL_SERVICE_EXPERIMENT);
}
- static void verifyDoFnSupported(DoFn<?, ?> fn, boolean streaming, boolean
streamingEngine) {
+ static void verifyDoFnSupported(
+ DoFn<?, ?> fn, boolean streaming, DataflowPipelineOptions options) {
if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support @RequiresTimeSortedInput in
streaming mode.",
DataflowRunner.class.getSimpleName()));
}
+ if (!useUnifiedWorker(options)) {
Review comment:
Just making sure that UW will support map state and set state regardless
if windmill appliance or streaming engine is used as well based upon the code
your developing within UW.
##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
bytes key = 4;
}
Review comment:
lgtm
##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
bytes key = 4;
}
+ message MultimapKeysUserState {
Review comment:
User state isn't stored in a PCollection, it is state associated with
the transform itself keyed by the currently processing window and key which is
how it differs from side inputs since side inputs.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec,
Coder<T> elemCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (SetState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new SetState<T>() {
+ private final MultimapUserState<T, Boolean> impl =
+ createMultimapUserState(id, elemCoder,
BooleanCoder.of());
Review comment:
No, to the runner it is just a byte[], for void its 0 length and for
boolean its 1 length so its a very minor perf optimization but for very large
sets it could matter a little.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]