sijie commented on a change in pull request #1587: [WIP] Add key/value 
operations in StateContext
URL: https://github.com/apache/incubator-pulsar/pull/1587#discussion_r181885174
 
 

 ##########
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
 ##########
 @@ -18,15 +18,61 @@
  */
 package org.apache.pulsar.functions.instance.state;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * A state context per function.
  */
 public interface StateContext {
 
+    /**
+     * Increment the given <i>key</i> by the given <i>amount</i>.
+     *
+     * @param key key to increment
+     * @param amount the amount incremented
+     */
     void incr(String key, long amount);
 
+    /**
+     * Update the given <i>key</i> to the provide <i>value</i>.
+     *
+     * <p>NOTE: the put operation might or might not be applied directly to 
the global state until
+     * the state is flushed via {@link #flush()} at the completion of function 
execution.
+     *
+     * <p>The behavior of `PUT` is non-deterministic, if two function 
instances attempt to update
+     * same key around the same time, there is no guarantee which update will 
be the final result.
+     * That says, if you attempt to get amount via {@link #getAmount(String)}, 
increment the amount
+     * based on the function computation logic, and update the computed amount 
back. one update will
+     * overwrite the other update. For this case, you are encouraged to use 
{@link #incr(String, long)}
+     * instead.
+     *
+     * @param key key to update.
+     * @param value value to update
+     */
+    void put(String key, ByteBuffer value);
+
+    /**
+     * Get the value of a given <i>key</i>.
+     *
+     * @param key key to retrieve
+     * @return a completable future representing the retrieve result.
+     */
+    CompletableFuture<ByteBuffer> getValue(String key);
+
+    /**
+     * Get the amount of a given <i>key</i>.
+     *
+     * @param key key to retrieve
+     * @return a completable future representing the retrieve result.
+     */
+    CompletableFuture<ByteBuffer> getAmount(String key);
+
+    /**
+     * Flush all the modification to the global state.
+     *
+     * @return a completable future representing the flush results.
+     */
     CompletableFuture<Void> flush();
 
 Review comment:
   `flush` is not exposed to users. 
   
   `flush` is used by java instance runnable, user only deal with 
`incrCounter`. the idea is in one function invocation, there might be multiple 
mutations to state, these mutations can be inflights. but java instance needs 
to know when all these inflight updates are completed, because it only can 
acknowledge after state updates are done to honor to at-least-once and 
effectively-once. that says `flush` here is not actually `flushing`, it means 
waiting for all inflight state updates to complete. I named it as `flush` to 
allow us to any optimization in future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to