This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/fix_storage_group_not_ready in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 154b54df94f565201f3fbe033c9cca62f2294f58 Author: Beyyes <[email protected]> AuthorDate: Tue Nov 15 14:46:34 2022 +0800 fix error for storage group not ready --- .../iotdb/db/doublelive/OperationSyncConsumer.java | 24 ++++++++++++++++++++-- .../db/doublelive/OperationSyncDDLProtector.java | 24 +++++++++++++++++++++- .../db/doublelive/OperationSyncDMLProtector.java | 24 +++++++++++++++++++++- 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java index cdb23d24eb..07040a488b 100644 --- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java +++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.doublelive; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.utils.Pair; @@ -30,6 +32,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; +import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY; + public class OperationSyncConsumer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class); @@ -68,10 +72,26 @@ public class OperationSyncConsumer implements Runnable { } catch (IoTDBConnectionException connectionException) { // warn IoTDBConnectionException and do serialization LOGGER.warn( - "OperationSyncConsumer can't transmit because network failure", connectionException); + "OperationSyncConsumer can't transmit for connection error", connectionException); + } catch (BatchExecutionException batchExecutionException) { + LOGGER.error( + "OperationSyncConsumer can't transmit for batchExecutionException", + batchExecutionException); + if (batchExecutionException.getStatusList().stream() + .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) { + continue; + } + } catch (StatementExecutionException statementExecutionException) { + LOGGER.error( + "OperationSyncConsumer can't transmit for statementExecutionException", + statementExecutionException); + if (statementExecutionException.getStatusCode() + != STORAGE_GROUP_NOT_READY.getStatusCode()) { + continue; + } } catch (Exception e) { // The PhysicalPlan has internal error, reject transmit - LOGGER.error("OperationSyncConsumer can't transmit", e); + LOGGER.error("OperationSyncConsumer can't transmit, discard it", e); continue; } } diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java index 1870145980..dea54020cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java +++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java @@ -20,7 +20,9 @@ package org.apache.iotdb.db.doublelive; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; import org.slf4j.Logger; @@ -29,6 +31,8 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY; + public class OperationSyncDDLProtector extends OperationSyncProtector { private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);; @@ -57,7 +61,25 @@ public class OperationSyncDDLProtector extends OperationSyncProtector { transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer); } catch (IoTDBConnectionException connectionException) { // warn IoTDBConnectionException and retry - LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException); + LOGGER.warn( + "OperationSyncDDLProtector can't transmit for connection error, retrying...", + connectionException); + } catch (BatchExecutionException batchExecutionException) { + LOGGER.error( + "OperationSyncDDLProtector can't transmit for batchExecutionException", + batchExecutionException); + if (batchExecutionException.getStatusList().stream() + .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) { + break; + } + } catch (StatementExecutionException statementExecutionException) { + LOGGER.error( + "OperationSyncDDLProtector can't transmit for statementExecutionException", + statementExecutionException); + if (statementExecutionException.getStatusCode() + != STORAGE_GROUP_NOT_READY.getStatusCode()) { + break; + } } catch (Exception e) { // error exception and break LOGGER.error("OperationSyncDDLProtector can't transmit", e); diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java index 2c2c89ef94..2b5585c9dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java +++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java @@ -20,12 +20,16 @@ package org.apache.iotdb.db.doublelive; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY; + public class OperationSyncDMLProtector extends OperationSyncProtector { private final OperationSyncDDLProtector ddlProtector; @@ -61,7 +65,25 @@ public class OperationSyncDMLProtector extends OperationSyncProtector { transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer); } catch (IoTDBConnectionException connectionException) { // warn IoTDBConnectionException and retry - LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException); + LOGGER.warn( + "OperationSyncDMLProtector can't transmit for connection error, retrying...", + connectionException); + } catch (BatchExecutionException batchExecutionException) { + LOGGER.error( + "OperationSyncDMLProtector can't transmit for batchExecutionException", + batchExecutionException); + if (batchExecutionException.getStatusList().stream() + .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) { + break; + } + } catch (StatementExecutionException statementExecutionException) { + LOGGER.error( + "OperationSyncDMLProtector can't transmit for statementExecutionException", + statementExecutionException); + if (statementExecutionException.getStatusCode() + != STORAGE_GROUP_NOT_READY.getStatusCode()) { + break; + } } catch (Exception e) { // error exception and break LOGGER.error("OperationSyncDMLProtector can't transmit", e);
