This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 33054c990 [Improve][Zeta][Imap] Set the write data timeout to be 
configurable (#4059)
33054c990 is described below

commit 33054c99017c32fef8022d0c8fa5c767b6a3e8dd
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Feb 6 16:35:56 2023 +0800

    [Improve][Zeta][Imap] Set the write data timeout to be configurable (#4059)
    
    * [Improve][Zeta-IMAP]Set the write data timeout to be configurable
    
    * fix check style
    
    * when request id is removed, we don't set status
---
 .../seatunnel/engine/imap/storage/api/IMapStorage.java      | 13 +++++++++----
 .../seatunnel/engine/imap/storage/file/IMapFileStorage.java |  8 ++++++--
 .../engine/imap/storage/file/common/FileConstants.java      |  5 +++++
 .../engine/imap/storage/file/disruptor/WALWorkHandler.java  |  4 ++++
 .../engine/imap/storage/file/IMapFileStorageTest.java       |  2 ++
 5 files changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
index 54c61d282..a0cfe526f 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
@@ -31,7 +31,9 @@ public interface IMapStorage {
 
     /**
      * Store a key-value pair in the map.
-     * @param key storage key
+     * todo: it's better add timeout parameter
+     *
+     * @param key   storage key
      * @param value storage value
      * @return storage status, true is success, false is fail
      */
@@ -39,14 +41,16 @@ public interface IMapStorage {
 
     /**
      * Store a key-value pair in the map storage.
+     *
      * @param map storage key-value pair
      * @return if some key-value pair is not stored, return this keys;
-     *         if all key-value pair is stored, return empty set.
+     * if all key-value pair is stored, return empty set.
      */
     public Set<Object> storeAll(Map<Object, Object> map);
 
     /**
      * Delete a key  in the map storage.
+     *
      * @param key storage key
      * @return storage status, true is success, false is fail
      */
@@ -54,9 +58,10 @@ public interface IMapStorage {
 
     /**
      * Delete a collection of keys from the map storage.
-     * @param  keys delete keys
+     *
+     * @param keys delete keys
      * @return if some keys delete fail, will return this keys
-     *         if all keys delete success, will return empty set
+     * if all keys delete success, will return empty set
      */
     public Set<Object> deleteAll(Collection<Object> keys);
 
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
index 819ee3c3a..3461deda5 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
@@ -26,6 +26,7 @@ import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants
 import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.CLUSTER_NAME;
 import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
 import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.NAMESPACE_KEY;
+import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;
 
 import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
 import 
org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
@@ -93,6 +94,8 @@ public class IMapFileStorage implements IMapStorage {
      */
     public String clusterName;
 
+    public long writDataTimeoutMilliseconds;
+
     /**
      * We used disruptor to implement the asynchronous write.
      */
@@ -109,7 +112,7 @@ public class IMapFileStorage implements IMapStorage {
 
     public static final int DEFAULT_QUERY_LIST_SIZE = 256;
 
-    public static final int DEFAULT_QUERY_DATA_TIMEOUT_MILLISECONDS = 100;
+    public static final long DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS = 1000 * 
60;
 
     private Configuration conf;
 
@@ -126,6 +129,7 @@ public class IMapFileStorage implements IMapStorage {
         this.businessName = (String) configuration.get(BUSINESS_KEY);
 
         this.clusterName = (String) configuration.get(CLUSTER_NAME);
+        this.writDataTimeoutMilliseconds = (long) 
configuration.getOrDefault(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 
DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS);
 
         this.region = String.valueOf(System.nanoTime());
         this.businessRootPath = namespace + DEFAULT_IMAP_FILE_PATH_SPLIT + 
clusterName + DEFAULT_IMAP_FILE_PATH_SPLIT + businessName + 
DEFAULT_IMAP_FILE_PATH_SPLIT;
@@ -275,7 +279,7 @@ public class IMapFileStorage implements IMapStorage {
     }
 
     private boolean queryExecuteStatus(long requestId) {
-        return queryExecuteStatus(requestId, 
DEFAULT_QUERY_DATA_TIMEOUT_MILLISECONDS);
+        return queryExecuteStatus(requestId, this.writDataTimeoutMilliseconds);
     }
 
     private boolean queryExecuteStatus(long requestId, long timeout) {
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
index 92708d24f..4ff4e914a 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
@@ -65,5 +65,10 @@ public class FileConstants {
          */
         String HDFS_CONFIG_KEY = "hdfsConfig";
 
+        /**
+         * The maximum waiting time of write operations
+         */
+        String WRITE_DATA_TIMEOUT_MILLISECONDS_KEY = 
"writeDataTimeoutMilliseconds";
+
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
index b2987ec44..34c176221 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
@@ -78,6 +78,10 @@ public class WALWorkHandler implements 
WorkHandler<FileWALEvent> {
     }
 
     private void executeResponse(long requestId, boolean success) {
+        if (null == RequestFutureCache.get(requestId)) {
+            log.warn("requestId is {} not found in RequestFutureCache", 
requestId);
+            return;
+        }
         try {
             RequestFutureCache.get(requestId).done(success);
         } catch (RuntimeException e) {
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
index e91756e65..b3310dd18 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.seatunnel.engine.imap.storage.file;
 
+import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.condition.OS.LINUX;
 import static org.junit.jupiter.api.condition.OS.MAC;
@@ -59,6 +60,7 @@ public class IMapFileStorageTest {
         properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, 
"/tmp/imap-kris-test/2");
         properties.put(FileConstants.FileInitProperties.CLUSTER_NAME, 
"test-one");
         properties.put(FileConstants.FileInitProperties.HDFS_CONFIG_KEY, CONF);
+        properties.put(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 60L);
 
         STORAGE.initialize(properties);
     }

Reply via email to