This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ec193bf2f1 Throw exception when wal entry is too large
5ec193bf2f1 is described below

commit 5ec193bf2f17a751391dc21332e484226abbf566
Author: Haonan <[email protected]>
AuthorDate: Tue Apr 15 16:53:07 2025 +0800

    Throw exception when wal entry is too large
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../dataregion/memtable/TsFileProcessor.java       | 38 +++++++++++++++-------
 .../wal/utils/MemoryControlledWALEntryQueue.java   | 14 ++++++++
 3 files changed, 42 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6fb9cdcf7a3..0c732282bcf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -111,6 +111,7 @@ public enum TSStatusCode {
   DATA_TYPE_MISMATCH(614),
   COLUMN_CATEGORY_MISMATCH(615),
   COLUMN_NOT_EXISTS(616),
+  WAL_ENTRY_TOO_LARGE(620),
 
   // Query Engine
   SQL_PARSE_ERROR(700),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index d56868671ad..861b72c923a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.IFullPath;
@@ -303,11 +304,16 @@ public class TsFileProcessor {
     } catch (Exception e) {
       rollbackMemoryInfo(memIncrements);
       logger.warn("Exception during wal flush", e);
-      throw new WriteProcessException(
-          String.format(
-              "%s: %s write WAL failed: %s",
-              dataRegionName, tsFileResource.getTsFile().getAbsolutePath(), 
e.getMessage()),
-          e);
+      if (e instanceof IoTDBRuntimeException) {
+        throw new WriteProcessException(
+            e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+      } else {
+        throw new WriteProcessException(
+            String.format(
+                "%s: %s write WAL failed: %s",
+                dataRegionName, tsFileResource.getTsFile().getAbsolutePath(), 
e.getMessage()),
+            e);
+      }
     } finally {
       // recordScheduleWalCost
       infoForMetrics[2] += System.nanoTime() - startTime;
@@ -394,11 +400,16 @@ public class TsFileProcessor {
     } catch (Exception e) {
       rollbackMemoryInfo(memIncrements);
       logger.warn("Exception during wal flush", e);
-      throw new WriteProcessException(
-          String.format(
-              "%s: %s write WAL failed: %s",
-              dataRegionName, tsFileResource.getTsFile().getAbsolutePath(), 
e.getMessage()),
-          e);
+      if (e instanceof IoTDBRuntimeException) {
+        throw new WriteProcessException(
+            e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+      } else {
+        throw new WriteProcessException(
+            String.format(
+                "%s: %s write WAL failed: %s",
+                dataRegionName, tsFileResource.getTsFile().getAbsolutePath(), 
e.getMessage()),
+            e);
+      }
     } finally {
       // recordScheduleWalCost
       infoForMetrics[2] += System.nanoTime() - startTime;
@@ -561,7 +572,12 @@ public class TsFileProcessor {
         int start = rangePair[0];
         int end = rangePair[1];
         for (int i = start; i < end; i++) {
-          results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
+          if (e instanceof IoTDBRuntimeException) {
+            results[i] =
+                RpcUtils.getStatus(((IoTDBRuntimeException) e).getErrorCode(), 
e.getMessage());
+          } else {
+            results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+          }
         }
       }
       rollbackMemoryInfo(memIncrements);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
index 961d1dd97c3..2bd56a1a4fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 
@@ -26,6 +27,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.rpc.TSStatusCode.WAL_ENTRY_TOO_LARGE;
+
 public class MemoryControlledWALEntryQueue {
   private final BlockingQueue<WALEntry> queue;
   private static final Object nonFullCondition = new Object();
@@ -49,6 +52,17 @@ public class MemoryControlledWALEntryQueue {
     long elementSize = getElementSize(e);
     synchronized (nonFullCondition) {
       while 
(!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize))
 {
+        if (elementSize
+            > 
SystemInfo.getInstance().getWalBufferQueueMemoryBlock().getTotalMemorySizeInBytes())
 {
+          throw new IoTDBRuntimeException(
+              "The element size of WALEntry "
+                  + elementSize
+                  + " is larger than the total memory size of wal buffer queue 
"
+                  + SystemInfo.getInstance()
+                      .getWalBufferQueueMemoryBlock()
+                      .getTotalMemorySizeInBytes(),
+              WAL_ENTRY_TOO_LARGE.getStatusCode());
+        }
         nonFullCondition.wait();
       }
     }

Reply via email to