dybyte opened a new issue, #10209: URL: https://github.com/apache/seatunnel/issues/10209
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description ## Background Currently, SeaTunnel relies directly on Hazelcast IMap for internal state management. This tight coupling makes it difficult to: - Replace the underlying storage engine (e.g., switching from Hazelcast IMap to RocksDB) - Clearly define the responsibilities of state management components ## Proposal Introduce a **`StateStore` abstraction** to encapsulate all operations currently performed on Hazelcast IMap, along with a **`StateStoreFactory`** to create instances of `StateStore`. ### Components  - **StateStore**: Abstracts all methods of Hazelcast IMap used internally in SeaTunnel. - **StateStoreFactory**: Responsible for creating `StateStore` instances. - **SeaTunnelServer**: During initialization, it uses `StateStoreFactory` to create a `StateStore` and passes it as a dependency to the services that require it. This design allows future replacement of Hazelcast IMap with RocksDB or any other storage engine by simply implementing `StateStore` and `StateStoreFactory` for the new engine. Additionally, creating a storage implementation using RocksDB (or others) becomes clear and straightforward. ### Example ```java import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngineImpl; public class IMapFactory implements MapFactory { private final NodeEngineImpl nodeEngine; public IMapFactory(NodeEngineImpl nodeEngine) { this.nodeEngine = nodeEngine; } @Override public <K, V> MapStorage<K, V> getMap(String mapName) { IMap<K, V> iMap = nodeEngine.getHazelcastInstance().getMap(mapName); return new IMapStorage<>(iMap); } } ``` ```java import com.hazelcast.map.IMap; import com.hazelcast.map.listener.MapListener; import javax.annotation.Nonnull; import java.util.Collection; import java.util.EventListener; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; public class IMapStorage<K, V> implements MapStorage<K, V> { private final IMap<K, V> iMap; public IMapStorage(IMap<K, V> iMap) { this.iMap = iMap; } @Override public V putIfAbsent(K key, V value) { return iMap.putIfAbsent(key, value); } @Override public V compute(K key, BiFunction<K, V, V> remappingFunction) { return iMap.compute(key, remappingFunction); } @Override public void remove(Object key) { iMap.remove(key); } @Override public V get(Object key) { return iMap.get(key); } @Override public void put(K key, V value) { iMap.put(key, value); } @Override public void set(K key, V value) { iMap.put(key, value); } @Override public Set<Map.Entry<K, V>> entrySet() { return iMap.entrySet(); } @Override public void forEach(BiConsumer<? super K, ? super V> action) { iMap.forEach(action); } @Override public UUID addEntryListener(@Nonnull EventListener listener, boolean includeValue) { return iMap.addEntryListener((MapListener) listener, includeValue); } @Override public Collection<V> values() { return iMap.values(); } @Override public V getOrDefault(Object key, V defaultValue) { return iMap.getOrDefault(key, defaultValue); } @Override public V computeIfAbsent(@Nonnull K key, @Nonnull Function<? super K, ? extends V> func) { return iMap.computeIfAbsent(key, func); } @Override public V put(@Nonnull K key, @Nonnull V value, long ttl, @Nonnull TimeUnit timeUnit) { return iMap.put(key, value, ttl, timeUnit); } @Override public boolean isEmpty() { return iMap.isEmpty(); } @Override public boolean containsKey(@Nonnull Object key) { return iMap.containsKey(key); } @Override public int size() { return iMap.size(); } } ``` Feedback is welcome, as there might be parts that need improvement. Thanks! ### Usage Scenario _No response_ ### Related issues #10181 ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
