This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 719834aea89 Pipe: refine procedure timeout message (#12402)
719834aea89 is described below
commit 719834aea89af308900b27272ef580adab6538ab
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Apr 24 21:38:00 2024 +0800
Pipe: refine procedure timeout message (#12402)
---
.../iotdb/confignode/manager/ProcedureManager.java | 43 +++++++++++++---------
1 file changed, 26 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 86c74e3e98d..4dc7b46b6fa 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -141,6 +141,8 @@ public class ProcedureManager {
public static final long PROCEDURE_WAIT_TIME_OUT =
COMMON_CONFIG.getConnectionTimeoutInMS();
private static final int PROCEDURE_WAIT_RETRY_TIMEOUT = 10;
+ private static final String PROCEDURE_TIMEOUT_MESSAGE =
+ "Timed out to wait for procedure return. The procedure is still
running.";
private final ConfigManager configManager;
private ProcedureExecutor<ConfigNodeProcedureEnv> executor;
@@ -831,7 +833,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -848,7 +850,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -865,7 +867,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -882,7 +884,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -899,7 +901,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -945,7 +947,7 @@ public class ProcedureManager {
return RpcUtils.SUCCESS_STATUS;
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -962,7 +964,7 @@ public class ProcedureManager {
return RpcUtils.SUCCESS_STATUS;
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -979,7 +981,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.CREATE_TOPIC_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new TSStatus(TSStatusCode.CREATE_TOPIC_ERROR.getStatusCode())
@@ -997,7 +999,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.DROP_TOPIC_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.DROP_TOPIC_ERROR.getStatusCode()).setMessage(e.getMessage());
@@ -1014,7 +1016,7 @@ public class ProcedureManager {
return RpcUtils.SUCCESS_STATUS;
} else {
return new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode())
@@ -1032,7 +1034,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.CREATE_CONSUMER_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new TSStatus(TSStatusCode.CREATE_CONSUMER_ERROR.getStatusCode())
@@ -1050,7 +1052,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.DROP_CONSUMER_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new TSStatus(TSStatusCode.DROP_CONSUMER_ERROR.getStatusCode())
@@ -1068,7 +1070,7 @@ public class ProcedureManager {
return RpcUtils.SUCCESS_STATUS;
} else {
return new
TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode())
@@ -1086,7 +1088,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode())
@@ -1104,7 +1106,7 @@ public class ProcedureManager {
return statusList.get(0);
} else {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
+
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode())
@@ -1152,8 +1154,7 @@ public class ProcedureManager {
if (!finishedProcedure.isFinished()) {
// the procedure is still executing
statusList.add(
- RpcUtils.getStatus(
- TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Procedure execution
timed out."));
+ RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
PROCEDURE_TIMEOUT_MESSAGE));
isSucceed = false;
continue;
}
@@ -1187,6 +1188,14 @@ public class ProcedureManager {
return isSucceed;
}
+ private static String wrapTimeoutMessageForPipeProcedure(String message) {
+ if (message.equals(PROCEDURE_TIMEOUT_MESSAGE)) {
+ return message
+ + " Please manually check later whether the procedure is executed
successfully.";
+ }
+ return message;
+ }
+
public static void sleepWithoutInterrupt(final long timeToSleep) {
long currentTime = System.currentTimeMillis();
final long endTime = timeToSleep + currentTime;