Copilot commented on code in PR #9988:
URL: https://github.com/apache/seatunnel/pull/9988#discussion_r2584514290


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java:
##########
@@ -82,7 +82,7 @@ private List<String> getFileNames(Path parentPath) {
             }
             return fileNames;
         } catch (IOException e) {
-            throw new IMapStorageException(e, "get file names error,path is 
s%", parentPath);
+            throw new RocksDBStorageException(e, "get file names error,path is 
s%", parentPath);

Review Comment:
   The error message format specifier `s%` appears incorrect. It should be `%s` 
for proper string formatting. This will cause the error message to display 
incorrectly.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBValueState.java:
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.server.rocksdb;
+
+import org.apache.seatunnel.shade.com.google.common.collect.MapMaker;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.UnaryOperator;
+
+@Slf4j
+public class RocksDBValueState<K, V> implements ValueState<K, V> {
+    private final RocksDB db;
+    private final ColumnFamilyHandle columnFamilyHandle;
+    private final Serializer serializer;
+    private final WriteOptions writeOptions = new 
WriteOptions().setSync(false);
+
+    private final Map<K, ReentrantLock> lockMap =
+            new MapMaker().concurrencyLevel(16).weakKeys().makeMap();
+
+    RocksDBValueState(RocksDB db, ColumnFamilyHandle columnFamilyHandle, 
Serializer serializer) {
+        this.db = db;
+        this.columnFamilyHandle = columnFamilyHandle;
+        this.serializer = serializer;
+    }
+
+    private byte[] encode(Object object) throws IOException {
+        if (object == null) throw CommonError.illegalArgument("object is 
null", "encode object");
+        byte[] payload;
+        if (serializer != null) {
+            payload = serializer.serialize(object);
+        } else {
+            payload = encodeObject(object);
+        }
+        String className = object.getClass().getName();
+        return encodeWithClassName(payload, className);
+    }
+
+    private static byte[] encodeObject(Object key) throws IOException {
+        if (key == null) throw CommonError.illegalArgument("object is null", 
"encode object");
+        try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+                ObjectOutputStream objectOutputStream =
+                        new ObjectOutputStream(byteArrayOutputStream)) {
+            objectOutputStream.writeObject(key);
+            objectOutputStream.flush();
+            return byteArrayOutputStream.toByteArray();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private K decodeKey(byte[] bytes) throws IOException, 
ClassNotFoundException {
+        if (bytes == null) return null;
+        Decoded decoded = decodeWithClassName(bytes);
+        Class<?> actualClass = Class.forName(decoded.className);
+        if (serializer != null) {
+            try {
+                return (K) serializer.deserialize(decoded.payload, 
actualClass);
+            } catch (Exception ex) {
+                log.warn(
+                        "serializer.deserialize failed for key class {}, 
fallback to Java deserialization",
+                        decoded.className,
+                        ex);
+            }
+        }
+        return (K) javaDeserialize(decoded.payload);
+    }
+
+    @SuppressWarnings("unchecked")
+    private V decodeValue(byte[] bytes) throws IOException, 
ClassNotFoundException {
+        if (bytes == null) return null;
+        Decoded decoded = decodeWithClassName(bytes);
+        Class<?> actualClass = Class.forName(decoded.className);
+        if (serializer != null) {
+            try {
+                return (V) serializer.deserialize(decoded.payload, 
actualClass);
+            } catch (Exception e) {
+                log.warn(
+                        "serializer.deserialize failed for value class {}, 
fallback to Java deserialization",
+                        decoded.className,
+                        e);
+            }
+        }
+        return (V) javaDeserialize(decoded.payload);
+    }
+
+    private Object javaDeserialize(byte[] payload) throws IOException, 
ClassNotFoundException {
+        if (payload == null)
+            throw CommonError.illegalArgument("payload is null", "deserialize 
payload");
+        try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(payload);
+                ObjectInputStream objectInputStream = new 
ObjectInputStream(byteArrayInputStream)) {
+            return objectInputStream.readObject();
+        }
+    }
+
+    private ReentrantLock getLock(K key) {
+        return lockMap.computeIfAbsent(key, k -> new ReentrantLock());
+    }
+
+    public V get(K key) {
+        if (key == null) throw new NullPointerException("key is null");
+        ReentrantLock lock = getLock(key);
+        lock.lock();
+        try {
+            byte[] rawKey = encode(key);
+            byte[] value = db.get(columnFamilyHandle, rawKey);
+            return decodeValue(value);
+        } catch (Exception e) {
+            throw new RocksDBRuntimeException("Failed to get value from 
RocksDB. key: " + key, e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void put(K key, V value) {
+        if (key == null) throw new NullPointerException("key is null");
+        ReentrantLock lock = getLock(key);
+        lock.lock();
+        try {
+            byte[] rawKey = encode(key);
+            if (value == null) {
+                db.delete(columnFamilyHandle, writeOptions, rawKey);
+            } else {
+                byte[] valueBytes = encode(value);
+                db.put(columnFamilyHandle, writeOptions, rawKey, valueBytes);
+            }
+        } catch (Exception e) {
+            throw new RocksDBRuntimeException(
+                    String.format(
+                            "Failed to put key-value into RocksDB. key: %s, 
value: %s", key, value),
+                    e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void remove(K key) {
+        if (key == null) throw new NullPointerException("key is null");
+        ReentrantLock lock = getLock(key);
+        lock.lock();
+        try {
+            byte[] rawKey = encode(key);
+            db.delete(columnFamilyHandle, writeOptions, rawKey);
+        } catch (Exception e) {
+            throw new RocksDBRuntimeException("Failed to remove key from 
RocksDB. key: " + key, e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean contains(K key) throws IOException, RocksDBException {
+        if (key == null) throw new NullPointerException("key is null");
+        ReentrantLock lock = getLock(key);
+        lock.lock();
+        try {
+            byte[] rawKey = encode(key);
+            byte[] value = db.get(columnFamilyHandle, rawKey);
+            return value != null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public Iterable<Map.Entry<K, V>> entries() {
+        return () -> {
+            try {
+                return iterator();
+            } catch (Exception e) {
+                throw new RocksDBRuntimeException("Failed to create iterator 
for entries", e);
+            }
+        };
+    }
+
+    @Override
+    public Iterable<K> keys() {
+        return () -> new KeyIterator(iterator());
+    }
+
+    @Override
+    public Iterable<V> values() throws Exception {
+        return () -> new ValueIterator(iterator());
+    }
+
+    @Override
+    public Iterator<Map.Entry<K, V>> iterator() {
+        RocksIterator rocksIter;
+        try {
+            rocksIter = db.newIterator(columnFamilyHandle);
+        } catch (Exception e) {
+            throw new RocksDBRuntimeException("Failed to create 
RocksIterator", e);
+        }
+        rocksIter.seekToFirst();
+
+        return new AutoCloseableIterator<Map.Entry<K, V>>() {
+            private boolean closed = false;
+
+            private void closeIfNeeded() {
+                if (!closed) {
+                    try {
+                        rocksIter.close();
+                    } catch (Exception ignored) {
+                        log.warn("Failed to close RocksIterator", ignored);
+                    } finally {
+                        closed = true;
+                    }
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                boolean valid = rocksIter.isValid();
+                if (!valid) {
+                    closeIfNeeded();
+                }
+                return valid;
+            }
+
+            @Override
+            public Map.Entry<K, V> next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                try {
+                    byte[] keyBytes = rocksIter.key();
+                    byte[] valueBytes = rocksIter.value();
+                    K k = decodeKey(keyBytes);
+                    V v = decodeValue(valueBytes);
+                    rocksIter.next();
+                    return new AbstractMap.SimpleEntry<>(k, v);
+                } catch (Exception e) {
+                    closeIfNeeded();
+                    throw new RocksDBRuntimeException("Failed to deserialize 
entry", e);
+                }
+            }
+
+            @Override
+            public void close() {
+                closeIfNeeded();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public boolean isEmpty() {
+        RocksIterator it = db.newIterator(columnFamilyHandle);
+        try {
+            it.seekToFirst();
+            return !it.isValid();
+        } finally {
+            try {
+                it.close();
+            } catch (Exception ignored) {
+                log.warn("Failed to close RocksIterator", ignored);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            columnFamilyHandle.close();
+            writeOptions.close();
+        } catch (Exception ignored) {
+            log.warn("Failed to close ColumnFamilyHandle", ignored);
+        }
+    }
+
+    public void compute(K key, UnaryOperator<V> remappingFunction) {
+        if (key == null) throw new NullPointerException("key");
+        ReentrantLock lock = getLock(key);
+        lock.lock();
+        try {
+            byte[] rawKey = encode(key);
+            byte[] rawValue = db.get(columnFamilyHandle, rawKey);
+            V oldValue = decodeValue(rawValue);
+            V newValue = remappingFunction.apply(oldValue);
+
+            if (newValue == null) {
+                db.delete(columnFamilyHandle, rawKey);

Review Comment:
   The `compute` method uses `db.delete` without write options, while other 
methods like `put` and `remove` use `writeOptions` for consistency. This 
inconsistency could lead to different sync behavior. Consider using 
`db.delete(columnFamilyHandle, writeOptions, rawKey)` for consistency.
   ```suggestion
                   db.delete(columnFamilyHandle, writeOptions, rawKey);
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -192,6 +197,23 @@ private void startMaster() {
                 0,
                 
seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
                 TimeUnit.SECONDS);
+        if (rocksDBService == null) {
+            try {
+                String safeAddress =
+                        
nodeEngine.getThisAddress().toString().replaceAll("[^A-Za-z0-9._-]", "_");
+                String dbPath = DB_PATH + "_" + safeAddress;
+
+                this.rocksDBService =
+                        new RocksDBService(
+                                dbPath,
+                                MapStoreConfigFactory.createMapStoreConfig(
+                                        
seaTunnelConfig.getEngineConfig().getMapStoreConfig(),
+                                        nodeEngine.getHazelcastInstance()));
+            } catch (Exception e) {
+                LOGGER.severe("Failed to initialize RocksDB state backend: " + 
e.getMessage());
+                throw new SeaTunnelEngineException("Failed to init 
RocksDBStateBackend", e);
+            }
+        }

Review Comment:
   The RocksDBService is initialized only in `startMaster()` but not in 
`startWorker()`. However, the getter `getRocksDBService()` can be called from 
any node. If a worker node calls `getRocksDBService()`, it will return null, 
which could cause NullPointerException. Consider either initializing 
RocksDBService for all node types or documenting that it's only available on 
master nodes and adding null checks in the getter.



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBServiceTest.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.server.rocksdb;
+
+import org.apache.seatunnel.engine.common.config.server.MapStoreConfig;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+class RocksDBServiceTest {
+    private static final String DB_PATH = "rocksdb_test";
+    private static final String STATE_NAME = "default";
+    private RocksDBService rocksDBService;
+
+    @BeforeEach
+    void setUp() {
+        rocksDBService = new RocksDBService(DB_PATH, new MapStoreConfig());
+    }
+
+    @AfterEach
+    void tearDown() {
+        rocksDBService.close(true);
+    }
+
+    @Test
+    void testPutAndGetData() {
+        String key = "testKey";
+        String value = "testValue";
+
+        rocksDBService.putData(STATE_NAME, Collections.singletonMap(key, 
value));
+        String retrievedValue = rocksDBService.getData(STATE_NAME, key);
+
+        Assertions.assertEquals(value, retrievedValue);
+    }
+
+    @Test
+    void testGetAllData() {
+        Map<String, String> initialData = new HashMap<>();
+        initialData.put("testKey1", "testValue1");
+        initialData.put("testKey2", "testValue2");
+        initialData.put("testKey3", "testValue3");
+        rocksDBService.putData(STATE_NAME, initialData);
+
+        Map<String, String> allData = rocksDBService.getAllData(STATE_NAME);
+        for (Map.Entry<String, String> entry : initialData.entrySet()) {
+            Assertions.assertEquals(entry.getValue(), 
allData.get(entry.getKey()));
+        }
+    }
+
+    @Test
+    void testRemoveData() {
+        Map<String, String> initialData = new HashMap<>();
+        initialData.put("testKey1", "testValue1");
+        initialData.put("testKey2", "testValue2");
+        initialData.put("testKey3", "testValue3");
+        rocksDBService.putData(STATE_NAME, initialData);
+
+        rocksDBService.removeData(STATE_NAME, "testKey2");
+
+        String testKey2 = rocksDBService.getData(STATE_NAME, "testKey2");
+        Assertions.assertNull(testKey2);
+
+        Map<String, String> allData = rocksDBService.getAllData(STATE_NAME);
+        Assertions.assertEquals(2, allData.size());
+    }
+
+    @Test
+    void testCloseAndReopen() {
+        String key = "testKey";
+        String value = "testValue";
+
+        rocksDBService.putData(STATE_NAME, Collections.singletonMap(key, 
value));
+        rocksDBService.close(false);
+
+        rocksDBService = new RocksDBService(DB_PATH, new MapStoreConfig());
+        String retrievedValue = rocksDBService.getData(STATE_NAME, key);
+
+        Assertions.assertEquals(value, retrievedValue);
+    }

Review Comment:
   [nitpick] In the `testCloseAndReopen` test, after closing and reopening the 
RocksDBService, the old instance is replaced. However, if `setUp()` is called 
again (e.g., in a test framework that runs cleanup), it will create a new 
instance and may cause issues with the manually created instance. Consider 
using a cleanup flag or resetting the service in `tearDown()` to handle this 
case properly.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStore.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.persistence.rocksdb;
+
+import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorage;
+import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory;
+
+import lombok.SneakyThrows;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FileMapStore {
+    private RocksDBStorage mapStorage;
+
+    public FileMapStore(RocksDBStorageFactory factory, Map<String, Object> 
configuration) {
+        this.mapStorage = factory.create(configuration);
+    }
+
+    public void destroy() {
+        mapStorage.destroy(false);
+    }
+
+    public void store(Object key, Object value) {
+        mapStorage.store(key, value);
+    }
+
+    public void storeAll(Map<Object, Object> map) {
+        mapStorage.storeAll(map);
+    }
+
+    public void delete(Object key) {
+        mapStorage.delete(key);
+    }
+
+    public void deleteAll(Collection<Object> keys) {
+        mapStorage.deleteAll(keys);
+    }
+
+    @SneakyThrows
+    public Map<Object, Object> loadAll(Collection<Object> keys) {

Review Comment:
   The `@SneakyThrows` annotation suppresses checked exceptions, which can make 
debugging harder. The `loadAll()` method is declared to throw exceptions in the 
`RocksDBStorage` interface, so these should be explicitly handled or declared 
in the method signature rather than being suppressed.
   ```suggestion
   
       public Map<Object, Object> loadAll(Collection<Object> keys) throws 
Exception {
   ```



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java:
##########
@@ -120,12 +120,12 @@ private Object deserializeData(byte[] data, String 
className) {
             } catch (IOException e) {
                 // log.error("deserialize data error, data is {}, className is 
{}", data, className,
                 // e);
-                throw new IMapStorageException(
+                throw new RocksDBStorageException(
                         e, "deserialize data error: data is s%, className is 
s%", data, className);

Review Comment:
   The error message format specifiers `s%` are incorrect. They should be `%s` 
for proper string formatting. This will cause the error message to display 
incorrectly.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetDataOperation.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.rocksdb;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TracingOperation;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+import java.io.IOException;
+
+public class GetDataOperation<K, V> extends TracingOperation implements 
IdentifiedDataSerializable {
+    private String stateName;
+    private K key;
+    private V value;
+
+    public GetDataOperation() {}
+
+    public GetDataOperation(String stateName, K key) {
+        this.stateName = stateName;
+        this.key = key;
+    }
+
+    @Override
+    public void runInternal() throws Exception {
+        SeaTunnelServer seaTunnelServer = getService();
+        value = seaTunnelServer.getRocksDBService().getData(stateName, key);
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeString(stateName);
+        out.writeObject(key);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        this.stateName = in.readString();
+        this.key = in.readObject();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.GET_DATA_OPERATION;
+    }
+
+    public V getValue() {
+        return value;
+    }

Review Comment:
   The `GetDataOperation` class stores the result in the `value` field and has 
a `getValue()` getter, but it doesn't override the `getResponse()` method. 
Hazelcast's operation framework expects operations to override `getResponse()` 
to return the result to the caller. Without this, the operation will not 
properly return the retrieved value. Add `@Override public Object getResponse() 
{ return value; }` to fix this.
   ```suggestion
       }
   
       @Override
       public Object getResponse() {
           return value;
       }
   ```



##########
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java:
##########
@@ -97,6 +97,77 @@ public static class MasterServerConfigOptions {
                         .defaultValue(1)
                         .withDescription("Number of partitions for storing job 
metrics in IMap.");
         /////////////////////////////////////////////////
+        // The options about RocksDB persistence start
+        public static final Option<Boolean> MAP_STORE_ENABLED =
+                Options.key("map-store-enabled")
+                        .booleanType()
+                        .defaultValue(false)
+                        .withDescription("Enable external map-store 
persistence for RocksDB");
+
+        // common properties
+        public static final Option<String> MAP_STORE_TYPE =
+                Options.key("type")
+                        .stringType()
+                        .defaultValue("hdfs")
+                        .withDescription("Type marker for map-store usage 
(hdfs/s3/oss)");
+
+        public static final Option<String> MAP_STORE_NAMESPACE =
+                Options.key("namespace")
+                        .stringType()
+                        .defaultValue("/tmp/seatunnel/imap")

Review Comment:
   The default namespace path `/tmp/seatunnel/imap` still references "imap" but 
this PR is replacing IMap with RocksDB. Consider updating the default path to 
`/tmp/seatunnel/rocksdb` or `/tmp/seatunnel/storage` to reflect the new 
implementation.
   ```suggestion
                           .defaultValue("/tmp/seatunnel/rocksdb")
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetAllDataOperation.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.rocksdb;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TracingOperation;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class GetAllDataOperation<K, V> extends TracingOperation
+        implements IdentifiedDataSerializable {
+    private String stateName;
+    private Map<K, V> map;
+
+    public GetAllDataOperation() {}
+
+    public GetAllDataOperation(String stateName) {
+        this.stateName = stateName;
+    }
+
+    @Override
+    public void runInternal() throws Exception {
+        SeaTunnelServer seaTunnelServer = getService();
+        map = seaTunnelServer.getRocksDBService().getAllData(stateName);
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeString(stateName);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        this.stateName = in.readString();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.GET_ALL_DATA_OPERATION;
+    }
+
+    public Map<K, V> getMap() {
+        return map;
+    }

Review Comment:
   The `GetAllDataOperation` class stores the result in the `map` field and has 
a `getMap()` getter, but it doesn't override the `getResponse()` method. 
Hazelcast's operation framework expects operations to override `getResponse()` 
to return the result to the caller. Add `@Override public Object getResponse() 
{ return map; }` to fix this.
   ```suggestion
       }
   
       @Override
       public Object getResponse() {
           return map;
       }
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBStateBackend.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.server.rocksdb;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.server.MapStoreConfig;
+import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory;
+import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
+import 
org.apache.seatunnel.engine.server.persistence.rocksdb.FileMapStoreManager;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class RocksDBStateBackend {
+    public static final String DB_PATH = "rocksdb";
+    public static final String DEFAULT_NAME = "default";
+
+    private final RocksDB db;
+    private final DBOptions dbOptions;
+    private final List<ColumnFamilyOptions> columnFamilyOptions = new 
ArrayList<>();
+    private final List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>();
+    private final Map<String, ColumnFamilyHandle> columnFamilyMap = new 
HashMap<>();
+    private final Map<String, RocksDBValueState<Object, Object>> valueStateMap 
= new HashMap<>();
+    private final List<String> initialStateNames =
+            new ArrayList<>(Arrays.asList(DEFAULT_NAME, 
Constant.IMAP_RUNNING_JOB_METRICS));
+
+    private FileMapStoreManager fileMapStoreManager;
+
+    public RocksDBStateBackend(
+            String dbPath, RocksDBStorageFactory factory, MapStoreConfig 
mapStoreConfig)
+            throws RocksDBException {
+        RocksDB.loadLibrary();
+        try {
+            List<ColumnFamilyDescriptor> descriptors = 
getColumnFamilyDescriptors(dbPath);
+
+            this.dbOptions =
+                    new 
DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
+            this.db = RocksDB.open(dbOptions, dbPath, descriptors, 
this.columnFamilyHandles);
+
+            initializeColumnFamilyMapAndValueStateMap();
+
+            if (mapStoreConfig != null && mapStoreConfig.isMapStoreEnabled()) {
+                this.fileMapStoreManager =
+                        new FileMapStoreManager(initialStateNames, factory, 
mapStoreConfig.toMap());
+            }
+        } catch (RocksDBException e) {
+            log.error("Failed to open RocksDB at {}: {}", dbPath, 
e.getMessage(), e);
+            close(dbPath);

Review Comment:
   In the constructor's catch block (line 84), `close(dbPath)` is called, but 
resources may not have been fully initialized yet. If `db` is null or 
`dbOptions` is null, the close method might encounter issues. Additionally, if 
an exception occurs during resource creation, some resources (like 
ColumnFamilyOptions in `columnFamilyOptions` list) might not get properly 
cleaned up. Consider wrapping the close call in a try-catch or checking which 
resources were initialized before attempting to close them.
   ```suggestion
               try {
                   close(dbPath);
               } catch (Exception closeException) {
                   log.warn("Exception occurred while cleaning up resources 
after RocksDB open failure: {}", closeException.getMessage(), closeException);
               }
   ```



##########
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java:
##########
@@ -97,6 +97,77 @@ public static class MasterServerConfigOptions {
                         .defaultValue(1)
                         .withDescription("Number of partitions for storing job 
metrics in IMap.");

Review Comment:
   The documentation description "Number of partitions for storing job metrics 
in IMap." still references IMap. Since this PR is replacing IMap with RocksDB, 
the description should be updated to reflect RocksDB usage.
   ```suggestion
                           .withDescription("Number of partitions for storing 
job metrics in RocksDB.");
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBStateBackend.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.server.rocksdb;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.server.MapStoreConfig;
+import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory;
+import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
+import 
org.apache.seatunnel.engine.server.persistence.rocksdb.FileMapStoreManager;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class RocksDBStateBackend {
+    public static final String DB_PATH = "rocksdb";
+    public static final String DEFAULT_NAME = "default";
+
+    private final RocksDB db;
+    private final DBOptions dbOptions;
+    private final List<ColumnFamilyOptions> columnFamilyOptions = new 
ArrayList<>();
+    private final List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>();
+    private final Map<String, ColumnFamilyHandle> columnFamilyMap = new 
HashMap<>();
+    private final Map<String, RocksDBValueState<Object, Object>> valueStateMap 
= new HashMap<>();
+    private final List<String> initialStateNames =
+            new ArrayList<>(Arrays.asList(DEFAULT_NAME, 
Constant.IMAP_RUNNING_JOB_METRICS));
+
+    private FileMapStoreManager fileMapStoreManager;
+
+    public RocksDBStateBackend(
+            String dbPath, RocksDBStorageFactory factory, MapStoreConfig 
mapStoreConfig)
+            throws RocksDBException {
+        RocksDB.loadLibrary();
+        try {
+            List<ColumnFamilyDescriptor> descriptors = 
getColumnFamilyDescriptors(dbPath);
+
+            this.dbOptions =
+                    new 
DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
+            this.db = RocksDB.open(dbOptions, dbPath, descriptors, 
this.columnFamilyHandles);
+
+            initializeColumnFamilyMapAndValueStateMap();
+
+            if (mapStoreConfig != null && mapStoreConfig.isMapStoreEnabled()) {
+                this.fileMapStoreManager =
+                        new FileMapStoreManager(initialStateNames, factory, 
mapStoreConfig.toMap());
+            }
+        } catch (RocksDBException e) {
+            log.error("Failed to open RocksDB at {}: {}", dbPath, 
e.getMessage(), e);
+            close(dbPath);
+            throw e;
+        }
+    }
+
+    private List<ColumnFamilyDescriptor> getColumnFamilyDescriptors(String 
dbPath) {
+        addExistingColumnFamilies(dbPath);
+
+        List<ColumnFamilyDescriptor> descriptors = new ArrayList<>();
+        for (String name : initialStateNames) {
+            ColumnFamilyOptions options = new ColumnFamilyOptions();
+            columnFamilyOptions.add(options);
+            if (DEFAULT_NAME.equals(name)) {
+                descriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, options));
+            } else {
+                descriptors.add(
+                        new 
ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8), options));
+            }
+        }
+        return descriptors;
+    }
+
+    private void addExistingColumnFamilies(String dbPath) {
+        try (Options options = new Options()) {
+            List<byte[]> existing = RocksDB.listColumnFamilies(options, 
dbPath);
+
+            List<String> existingNames = new ArrayList<>();
+            for (byte[] bytes : existing) {
+                if (Arrays.equals(bytes, RocksDB.DEFAULT_COLUMN_FAMILY)) {
+                    existingNames.add(DEFAULT_NAME);
+                } else {
+                    existingNames.add(new String(bytes, 
StandardCharsets.UTF_8));
+                }
+            }
+
+            for (String name : existingNames) {
+                if (!initialStateNames.contains(name)) {
+                    this.initialStateNames.add(name);
+                }
+            }
+        } catch (RocksDBException ignored) {
+            log.info("RocksDB at {} does not exist. It will be created.", 
dbPath);
+        }
+    }
+
+    private void initializeColumnFamilyMapAndValueStateMap() {
+        int idx = 0;
+        for (String name : initialStateNames) {
+            if (StringUtils.isBlank(name)) continue;
+            if (idx < columnFamilyHandles.size()) {
+                columnFamilyMap.put(name, columnFamilyHandles.get(idx));
+            }
+            idx++;
+        }
+
+        for (String name : initialStateNames) {
+            if (StringUtils.isBlank(name)) continue;
+            RocksDBValueState<Object, Object> valueState =
+                    new RocksDBValueState<>(
+                            db, columnFamilyMap.get(name), new 
ProtoStuffSerializer());
+            valueStateMap.put(name, valueState);
+        }
+    }
+
+    public void init() {
+        if (fileMapStoreManager == null) return;
+        for (String name : initialStateNames) {
+            Map<Object, Object> loaded = fileMapStoreManager.loadAll(name);
+            RocksDBValueState<Object, Object> valueState = getValueState(name);
+            for (Map.Entry<Object, Object> entry : loaded.entrySet()) {
+                valueState.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    public <K, V> RocksDBValueState<K, V> getValueState(String stateName) {
+        @SuppressWarnings("unchecked")
+        RocksDBValueState<K, V> rocksDBValueState =
+                (RocksDBValueState<K, V>) valueStateMap.get(stateName);
+        if (rocksDBValueState == null) {
+            throw CommonError.illegalArgument(stateName, 
"getRocksDBValueState");
+        }
+        return rocksDBValueState;
+    }
+
+    public <K, V> void put(String stateName, K key, V value) {
+        RocksDBValueState<K, V> valueState = getValueState(stateName);
+        valueState.compute(key, oldVal -> mergeValues(oldVal, value));
+        if (fileMapStoreManager != null) {
+            V merged = valueState.get(key);
+            fileMapStoreManager.put(stateName, key, merged);
+        }
+    }
+
+    public <K, V> void putAll(String stateName, Map<K, V> map) {
+        final RocksDBValueState<K, V> valueState = getValueState(stateName);
+        for (Map.Entry<K, V> e : map.entrySet()) {
+            K key = e.getKey();
+            valueState.compute(key, oldVal -> mergeValues(oldVal, 
e.getValue()));
+            if (fileMapStoreManager != null) {
+                V merged = valueState.get(key);
+                fileMapStoreManager.put(stateName, key, merged);
+            }
+        }

Review Comment:
   In `put()` and `putAll()` methods, after calling `valueState.compute()`, the 
code calls `valueState.get()` again to retrieve the merged value for file 
persistence. This results in an extra RocksDB read operation for each write 
when file persistence is enabled. Consider modifying the `compute()` method to 
return the new value or storing it during the compute operation to avoid this 
redundant read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to