shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1635648194


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of an ordered list user state that utilizes the Beam Fn 
State API to fetch,
+ * clear and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState<T> {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final TimestampedValueCoder<T> timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public static class TimestampedValueCoder<T> extends 
StructuredCoder<TimestampedValue<T>> {
+
+    private final Coder<T> valueCoder;
+
+    // Internally, a TimestampedValue is encoded with a KvCoder, where the key 
is encoded with
+    // a VarLongCoder and the value is encoded with a LengthPrefixCoder.
+    // Refer to the comment in StateAppendRequest
+    // (org/apache/beam/model/fn_execution/v1/beam_fn_api.proto) for more 
detail.
+    private final KvCoder<Long, T> internalKvCoder;
+
+    public static <T> OrderedListUserState.TimestampedValueCoder<T> 
of(Coder<T> valueCoder) {
+      return new OrderedListUserState.TimestampedValueCoder<>(valueCoder);
+    }
+
+    @Override
+    public Object structuralValue(TimestampedValue<T> value) {
+      Object structuralValue = valueCoder.structuralValue(value.getValue());
+      return TimestampedValue.of(structuralValue, value.getTimestamp());
+    }
+
+    @SuppressWarnings("unchecked")
+    TimestampedValueCoder(Coder<T> valueCoder) {
+      this.valueCoder = checkNotNull(valueCoder);
+      this.internalKvCoder = KvCoder.of(VarLongCoder.of(), 
LengthPrefixCoder.of(valueCoder));
+    }
+
+    @Override
+    public void encode(TimestampedValue<T> windowedElem, OutputStream 
outStream)
+        throws IOException {
+      internalKvCoder.encode(
+          KV.of(windowedElem.getTimestamp().getMillis(), 
windowedElem.getValue()), outStream);
+    }
+
+    @Override
+    public TimestampedValue<T> decode(InputStream inStream) throws IOException 
{
+      KV<Long, T> kv = internalKvCoder.decode(inStream);
+      return TimestampedValue.of(kv.getValue(), 
Instant.ofEpochMilli(kv.getKey()));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(
+          this, "TimestampedValueCoder requires a deterministic valueCoder", 
valueCoder);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.<Coder<?>>asList(valueCoder);
+    }
+
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
+    @Override
+    public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
+      return new TypeDescriptor<TimestampedValue<T>>() {}.where(
+          new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
+    }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return Collections.singletonList(valueCoder);
+    }
+  }
+
+  public OrderedListUserState(
+      Cache<?, ?> cache,
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      StateKey stateKey,
+      Coder<T> valueCoder) {
+    checkArgument(
+        stateKey.hasOrderedListUserState(),
+        "Expected OrderedListUserState StateKey but received %s.",
+        stateKey);
+    this.beamFnStateClient = beamFnStateClient;
+    this.timestampedValueCoder = TimestampedValueCoder.of(valueCoder);
+    this.request =
+        
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue<T> value) {
+    checkState(
+        !isClosed,

Review Comment:
   The way I implement the closed state in OrderedListUserState is consistent 
with the other states.  The flag `isClosed` is set when the class method 
asyncClose() is called. 
   
https://github.com/apache/beam/blob/65aef1a3b48f36058cecf3479a17b4b129118e81/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java#L276
   
   This class method is registered in stateFinalizers at 
`FnApiStateAccessor.java`
   
https://github.com/apache/beam/blob/65aef1a3b48f36058cecf3479a17b4b129118e81/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L929
   
   It is then called after each processElement in `FnApiDoFnRunner.java`
   
https://github.com/apache/beam/blob/65aef1a3b48f36058cecf3479a17b4b129118e81/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L836-L842.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of an ordered list user state that utilizes the Beam Fn 
State API to fetch,
+ * clear and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState<T> {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final TimestampedValueCoder<T> timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public static class TimestampedValueCoder<T> extends 
StructuredCoder<TimestampedValue<T>> {
+
+    private final Coder<T> valueCoder;
+
+    // Internally, a TimestampedValue is encoded with a KvCoder, where the key 
is encoded with
+    // a VarLongCoder and the value is encoded with a LengthPrefixCoder.
+    // Refer to the comment in StateAppendRequest
+    // (org/apache/beam/model/fn_execution/v1/beam_fn_api.proto) for more 
detail.
+    private final KvCoder<Long, T> internalKvCoder;
+
+    public static <T> OrderedListUserState.TimestampedValueCoder<T> 
of(Coder<T> valueCoder) {
+      return new OrderedListUserState.TimestampedValueCoder<>(valueCoder);
+    }
+
+    @Override
+    public Object structuralValue(TimestampedValue<T> value) {
+      Object structuralValue = valueCoder.structuralValue(value.getValue());
+      return TimestampedValue.of(structuralValue, value.getTimestamp());
+    }
+
+    @SuppressWarnings("unchecked")
+    TimestampedValueCoder(Coder<T> valueCoder) {
+      this.valueCoder = checkNotNull(valueCoder);
+      this.internalKvCoder = KvCoder.of(VarLongCoder.of(), 
LengthPrefixCoder.of(valueCoder));
+    }
+
+    @Override
+    public void encode(TimestampedValue<T> windowedElem, OutputStream 
outStream)
+        throws IOException {
+      internalKvCoder.encode(
+          KV.of(windowedElem.getTimestamp().getMillis(), 
windowedElem.getValue()), outStream);
+    }
+
+    @Override
+    public TimestampedValue<T> decode(InputStream inStream) throws IOException 
{
+      KV<Long, T> kv = internalKvCoder.decode(inStream);
+      return TimestampedValue.of(kv.getValue(), 
Instant.ofEpochMilli(kv.getKey()));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(
+          this, "TimestampedValueCoder requires a deterministic valueCoder", 
valueCoder);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.<Coder<?>>asList(valueCoder);
+    }
+
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
+    @Override
+    public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
+      return new TypeDescriptor<TimestampedValue<T>>() {}.where(
+          new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
+    }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return Collections.singletonList(valueCoder);
+    }
+  }
+
+  public OrderedListUserState(
+      Cache<?, ?> cache,
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      StateKey stateKey,
+      Coder<T> valueCoder) {
+    checkArgument(
+        stateKey.hasOrderedListUserState(),
+        "Expected OrderedListUserState StateKey but received %s.",
+        stateKey);
+    this.beamFnStateClient = beamFnStateClient;
+    this.timestampedValueCoder = TimestampedValueCoder.of(valueCoder);
+    this.request =
+        
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue<T> value) {
+    checkState(
+        !isClosed,

Review Comment:
   The way I implement the closed state in OrderedListUserState is consistent 
with the other states.  The flag `isClosed` is set when the class method 
`asyncClose()` is called. 
   
https://github.com/apache/beam/blob/65aef1a3b48f36058cecf3479a17b4b129118e81/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java#L276
   
   This class method is registered in stateFinalizers at 
`FnApiStateAccessor.java`
   
https://github.com/apache/beam/blob/65aef1a3b48f36058cecf3479a17b4b129118e81/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L929
   
   It is then called after each processElement in `FnApiDoFnRunner.java`
   
https://github.com/apache/beam/blob/65aef1a3b48f36058cecf3479a17b4b129118e81/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L836-L842.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to