dybyte opened a new issue, #10209:
URL: https://github.com/apache/seatunnel/issues/10209

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   ## Background
   
   Currently, SeaTunnel relies directly on Hazelcast IMap for internal state 
management. This tight coupling makes it difficult to:
   
   - Replace the underlying storage engine (e.g., switching from Hazelcast IMap 
to RocksDB)
   - Clearly define the responsibilities of state management components
   
   ## Proposal
   
   Introduce a **`StateStore` abstraction** to encapsulate all operations 
currently performed on Hazelcast IMap, along with a **`StateStoreFactory`** to 
create instances of `StateStore`.
   
   ### Components
   
   ![image.png](attachment:c7bfde71-ad4f-4364-ae57-824cbe9f98bd:image.png)
   
   - **StateStore**: Abstracts all methods of Hazelcast IMap used internally in 
SeaTunnel.
   - **StateStoreFactory**: Responsible for creating `StateStore` instances.
   - **SeaTunnelServer**: During initialization, it uses `StateStoreFactory` to 
create a `StateStore` and passes it as a dependency to the services that 
require it.
   
   This design allows future replacement of Hazelcast IMap with RocksDB or any 
other storage engine by simply implementing `StateStore` and 
`StateStoreFactory` for the new engine. Additionally, creating a storage 
implementation using RocksDB (or others) becomes clear and straightforward.
   
   ### Example
   
   ```java
   import com.hazelcast.map.IMap;
   import com.hazelcast.spi.impl.NodeEngineImpl;
   
   public class IMapFactory implements MapFactory {
       private final NodeEngineImpl nodeEngine;
   
       public IMapFactory(NodeEngineImpl nodeEngine) {
           this.nodeEngine = nodeEngine;
       }
   
       @Override
       public <K, V> MapStorage<K, V> getMap(String mapName) {
           IMap<K, V> iMap = nodeEngine.getHazelcastInstance().getMap(mapName);
           return new IMapStorage<>(iMap);
       }
   }
   ```
   
   ```java
   import com.hazelcast.map.IMap;
   import com.hazelcast.map.listener.MapListener;
   
   import javax.annotation.Nonnull;
   
   import java.util.Collection;
   import java.util.EventListener;
   import java.util.Map;
   import java.util.Set;
   import java.util.UUID;
   import java.util.concurrent.TimeUnit;
   import java.util.function.BiConsumer;
   import java.util.function.BiFunction;
   import java.util.function.Function;
   
   public class IMapStorage<K, V> implements MapStorage<K, V> {
       private final IMap<K, V> iMap;
   
       public IMapStorage(IMap<K, V> iMap) {
           this.iMap = iMap;
       }
   
       @Override
       public V putIfAbsent(K key, V value) {
           return iMap.putIfAbsent(key, value);
       }
   
       @Override
       public V compute(K key, BiFunction<K, V, V> remappingFunction) {
           return iMap.compute(key, remappingFunction);
       }
   
       @Override
       public void remove(Object key) {
           iMap.remove(key);
       }
   
       @Override
       public V get(Object key) {
           return iMap.get(key);
       }
   
       @Override
       public void put(K key, V value) {
           iMap.put(key, value);
       }
   
       @Override
       public void set(K key, V value) {
           iMap.put(key, value);
       }
   
       @Override
       public Set<Map.Entry<K, V>> entrySet() {
           return iMap.entrySet();
       }
   
       @Override
       public void forEach(BiConsumer<? super K, ? super V> action) {
           iMap.forEach(action);
       }
   
       @Override
       public UUID addEntryListener(@Nonnull EventListener listener, boolean 
includeValue) {
           return iMap.addEntryListener((MapListener) listener, includeValue);
       }
   
       @Override
       public Collection<V> values() {
           return iMap.values();
       }
   
       @Override
       public V getOrDefault(Object key, V defaultValue) {
           return iMap.getOrDefault(key, defaultValue);
       }
   
       @Override
       public V computeIfAbsent(@Nonnull K key, @Nonnull Function<? super K, ? 
extends V> func) {
           return iMap.computeIfAbsent(key, func);
       }
   
       @Override
       public V put(@Nonnull K key, @Nonnull V value, long ttl, @Nonnull 
TimeUnit timeUnit) {
           return iMap.put(key, value, ttl, timeUnit);
       }
   
       @Override
       public boolean isEmpty() {
           return iMap.isEmpty();
       }
   
       @Override
       public boolean containsKey(@Nonnull Object key) {
           return iMap.containsKey(key);
       }
   
       @Override
       public int size() {
           return iMap.size();
       }
   }
   
   ```
   Feedback is welcome, as there might be parts that need improvement. Thanks!
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   #10181 
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to