Ethan-Merrill opened a new issue #13633:
URL: https://github.com/apache/pulsar/issues/13633


   ## Motivation
   
   Certain uses of Pulsar functions could benefit from the ability to access 
the states of other functions. Currently functions can only access their own 
states, and so sharing information between functions requires other solutions 
such as writing to a separate database.
   
   ## Goal
   
   The goal is to enable the ability for a function to access another 
function's state. Given another function's tenant, namespace, and name, any 
function should be able to access the other function's state for read and write 
purposes. This PIP is not concerned with expanding the capabilities of function 
states, It only deals with expanding access to function states.
   
   ## API Changes
   
   The Pulsar function API would be modified to have the function context 
implement the following interface for accessing function states using a tenant, 
namespace, and name. 
   
   ```
   public interface SharedContext {
       /**
        * Update the state value for the key.
        *
        * @param key   name of the key
        * @param value state value of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        */
       void putState(String key, ByteBuffer value, String tenant, String ns, 
String name);
   
       /**
        * Update the state value for the key, but don't wait for the operation 
to be completed
        *
        * @param key   name of the key
        * @param value state value of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        */
       CompletableFuture<Void> putStateAsync(String key, ByteBuffer value, 
String tenant, String ns, String name);
   
       /**
        * Retrieve the state value for the key.
        *
        * @param key name of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        * @return the state value for the key.
        */
       ByteBuffer getState(String key, String tenant, String ns, String name);
   
       /**
        * Retrieve the state value for the key, but don't wait for the 
operation to be completed
        *
        * @param key name of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        * @return the state value for the key.
        */
       CompletableFuture<ByteBuffer> getStateAsync(String key, String tenant, 
String ns, String name);
   
       /**
        * Delete the state value for the key.
        *
        * @param key   name of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        */
       void deleteState(String key, String tenant, String ns, String name);
   
       /**
        * Delete the state value for the key, but don't wait for the operation 
to be completed
        *
        * @param key   name of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        */
       CompletableFuture<Void> deleteStateAsync(String key, String tenant, 
String ns, String name);
   
       /**
        * Increment the builtin distributed counter referred by key.
        *
        * @param key    The name of the key
        * @param amount The amount to be incremented
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        */
       void incrCounter(String key, long amount, String tenant, String ns, 
String name);
   
       /**
        * Increment the builtin distributed counter referred by key
        * but dont wait for the completion of the increment operation
        *
        * @param key    The name of the key
        * @param amount The amount to be incremented
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        */
       CompletableFuture<Void> incrCounterAsync(String key, long amount, String 
tenant, String ns, String name);
   
       /**
        * Retrieve the counter value for the key.
        *
        * @param key name of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        * @return the amount of the counter value for this key
        */
       long getCounter(String key, String tenant, String ns, String name);
   
       /**
        * Retrieve the counter value for the key, but don't wait
        * for the operation to be completed
        *
        * @param key name of the key
        * @param tenant the state tenant name
        * @param ns the state namespace name
        * @param name the state store name
        * @return the amount of the counter value for this key
        */
       CompletableFuture<Long> getCounterAsync(String key, String tenant, 
String ns, String name);
   }
   ```
   
   And the python context would have the following added:
   ```
   class Context(object):
     @abstractmethod
     def incr_counter(self, key, amount, tenant, ns, name):
       """incr the counter of a given key in the managed state"""
       pass
   
     @abstractmethod
     def get_counter(self, key, tenant, ns, name):
       """get the counter of a given key in the managed state"""
       pass
   
     @abstractmethod
     def del_counter(self, key, tenant, ns, name):
       """delete the counter of a given key in the managed state"""
       pass
   
     @abstractmethod
     def put_state(self, key, value, tenant, ns, name):
       """update the value of a given key in the managed state"""
       pass
   
     @abstractmethod
     def get_state(self, key, tenant, ns, name):
       """get the value of a given key in the managed state"""
       pass
   ```
   
   ## Implementation
   
   The implementations of the API functions are simple. For example:
   ```
   @Override
   public void incrCounter(String key, long amount, String tenant, String ns, 
String name) {
       DefaultStateStore stateStore = (DefaultStateStore) 
getCreateStateStore(tenant, ns, name);
       ensureStateEnabled(stateStore, tenant, ns, name);
       stateStore.incrCounter(key, amount);
   }
   ```
   
   This implementation would require a small change to the ensureStateEnabled() 
function to allow for checking that states other than the function's own state 
are enabled. 
   
   Additionally, a new function, getCreateStateStore() would need to be made or 
the existing getStateStore() function would need to be modified to create a new 
state if the requested one doesn't currently exist in the StateManager. This 
new function in the StateManager might look like:
   ```
   @Override
   public StateStore getCreateStore(String tenant, String namespace, String 
name) {
       String storeName = FunctionCommon.getFullyQualifiedName(tenant, 
namespace, name);
       StateStore store = stores.get(storeName);
   
       try {
           if (store == null) {
               store = createStore(tenant, namespace, name);
               registerStore(store);
           }
       } catch (Exception e) {
           store = null;
       }
       
       return store;
   }
   
   private StateStore createStore(String tenant, String namespace, String name) 
throws Exception {
       StateStore store = stateStoreProvider.getStateStore(tenant, namespace, 
name);
       StateStoreContext context = new StateStoreContextImpl();
       store.init(context);
       return store;
   }
   ```
   
   This implementation would just require that the JavaInstanceRunnable class 
passes the stateStoreProvider into the StateManager when its created so the 
StateManager can get new states when needed.
   
   For the python implementation currently a state manager class doesn't exist, 
so one would need to be created. It could be simple, as such: 
   ```
   class StateManager():
       states = {}
       state_storage_serviceurl = ""
   
       def __init__(self, state_storage_serviceurl):
           self.state_storage_serviceurl = state_storage_serviceurl
   
       def get_state(self, tenant, ns, name):
           table_ns = "%s_%s" % (str(tenant), str(ns))
           table_ns = table_ns.replace("-", "_")
           table_name = str(name)
           state_id = table_ns + table_name
   
           state = self.states.get(state_id) 
           if state is None:
               state = 
state_context.create_state_context(self.state_storage_serviceurl, table_ns, 
table_name)
               self.states[state_id] = state
   
           return state
   ```
   
   The PythonInstance class that creates the function context would also create 
a StateManager object and set it in the context object so the python api 
functions could use it as such: 
   ```
   def incr_counter(self, key, amount, tenant=None, ns=None, name=None):
     if tenant is None or ns is None or name is None:
       return self.state_context.incr(key, amount)
     else:
       state_context = self.state_manager.get_state(tenant, ns, name)
       return state_context.incr(key, amount)
   ```
   
   All of this should enable easy access to other functions' states in a way 
that has minimal impact on the existing functionality of Pulsar.
   
   ## Reject Alternatives
   
   None
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to