This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 33054c990 [Improve][Zeta][Imap] Set the write data timeout to be
configurable (#4059)
33054c990 is described below
commit 33054c99017c32fef8022d0c8fa5c767b6a3e8dd
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Feb 6 16:35:56 2023 +0800
[Improve][Zeta][Imap] Set the write data timeout to be configurable (#4059)
* [Improve][Zeta-IMAP]Set the write data timeout to be configurable
* fix check style
* when request id is removed, we don't set status
---
.../seatunnel/engine/imap/storage/api/IMapStorage.java | 13 +++++++++----
.../seatunnel/engine/imap/storage/file/IMapFileStorage.java | 8 ++++++--
.../engine/imap/storage/file/common/FileConstants.java | 5 +++++
.../engine/imap/storage/file/disruptor/WALWorkHandler.java | 4 ++++
.../engine/imap/storage/file/IMapFileStorageTest.java | 2 ++
5 files changed, 26 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
index 54c61d282..a0cfe526f 100644
---
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java
@@ -31,7 +31,9 @@ public interface IMapStorage {
/**
* Store a key-value pair in the map.
- * @param key storage key
+ * todo: it's better add timeout parameter
+ *
+ * @param key storage key
* @param value storage value
* @return storage status, true is success, false is fail
*/
@@ -39,14 +41,16 @@ public interface IMapStorage {
/**
* Store a key-value pair in the map storage.
+ *
* @param map storage key-value pair
* @return if some key-value pair is not stored, return this keys;
- * if all key-value pair is stored, return empty set.
+ * if all key-value pair is stored, return empty set.
*/
public Set<Object> storeAll(Map<Object, Object> map);
/**
* Delete a key in the map storage.
+ *
* @param key storage key
* @return storage status, true is success, false is fail
*/
@@ -54,9 +58,10 @@ public interface IMapStorage {
/**
* Delete a collection of keys from the map storage.
- * @param keys delete keys
+ *
+ * @param keys delete keys
* @return if some keys delete fail, will return this keys
- * if all keys delete success, will return empty set
+ * if all keys delete success, will return empty set
*/
public Set<Object> deleteAll(Collection<Object> keys);
diff --git
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
index 819ee3c3a..3461deda5 100644
---
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
@@ -26,6 +26,7 @@ import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants
import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.CLUSTER_NAME;
import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.NAMESPACE_KEY;
+import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;
import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
import
org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
@@ -93,6 +94,8 @@ public class IMapFileStorage implements IMapStorage {
*/
public String clusterName;
+ public long writDataTimeoutMilliseconds;
+
/**
* We used disruptor to implement the asynchronous write.
*/
@@ -109,7 +112,7 @@ public class IMapFileStorage implements IMapStorage {
public static final int DEFAULT_QUERY_LIST_SIZE = 256;
- public static final int DEFAULT_QUERY_DATA_TIMEOUT_MILLISECONDS = 100;
+ public static final long DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS = 1000 *
60;
private Configuration conf;
@@ -126,6 +129,7 @@ public class IMapFileStorage implements IMapStorage {
this.businessName = (String) configuration.get(BUSINESS_KEY);
this.clusterName = (String) configuration.get(CLUSTER_NAME);
+ this.writDataTimeoutMilliseconds = (long)
configuration.getOrDefault(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY,
DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS);
this.region = String.valueOf(System.nanoTime());
this.businessRootPath = namespace + DEFAULT_IMAP_FILE_PATH_SPLIT +
clusterName + DEFAULT_IMAP_FILE_PATH_SPLIT + businessName +
DEFAULT_IMAP_FILE_PATH_SPLIT;
@@ -275,7 +279,7 @@ public class IMapFileStorage implements IMapStorage {
}
private boolean queryExecuteStatus(long requestId) {
- return queryExecuteStatus(requestId,
DEFAULT_QUERY_DATA_TIMEOUT_MILLISECONDS);
+ return queryExecuteStatus(requestId, this.writDataTimeoutMilliseconds);
}
private boolean queryExecuteStatus(long requestId, long timeout) {
diff --git
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
index 92708d24f..4ff4e914a 100644
---
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
+++
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java
@@ -65,5 +65,10 @@ public class FileConstants {
*/
String HDFS_CONFIG_KEY = "hdfsConfig";
+ /**
+ * The maximum waiting time of write operations
+ */
+ String WRITE_DATA_TIMEOUT_MILLISECONDS_KEY =
"writeDataTimeoutMilliseconds";
+
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
index b2987ec44..34c176221 100644
---
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
+++
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java
@@ -78,6 +78,10 @@ public class WALWorkHandler implements
WorkHandler<FileWALEvent> {
}
private void executeResponse(long requestId, boolean success) {
+ if (null == RequestFutureCache.get(requestId)) {
+ log.warn("requestId is {} not found in RequestFutureCache",
requestId);
+ return;
+ }
try {
RequestFutureCache.get(requestId).done(success);
} catch (RuntimeException e) {
diff --git
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
index e91756e65..b3310dd18 100644
---
a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java
@@ -20,6 +20,7 @@
package org.apache.seatunnel.engine.imap.storage.file;
+import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.condition.OS.LINUX;
import static org.junit.jupiter.api.condition.OS.MAC;
@@ -59,6 +60,7 @@ public class IMapFileStorageTest {
properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY,
"/tmp/imap-kris-test/2");
properties.put(FileConstants.FileInitProperties.CLUSTER_NAME,
"test-one");
properties.put(FileConstants.FileInitProperties.HDFS_CONFIG_KEY, CONF);
+ properties.put(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 60L);
STORAGE.initialize(properties);
}