This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 459f5e84a4f [To dev/1.3] Throw exception when wal entry is too large
459f5e84a4f is described below

commit 459f5e84a4fd3dd8a02d72f3145becb97d88db34
Author: Haonan <[email protected]>
AuthorDate: Tue Apr 15 18:27:53 2025 +0800

    [To dev/1.3] Throw exception when wal entry is too large
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../dataregion/memtable/TsFileProcessor.java       | 38 +++++++++++++++-------
 .../wal/utils/MemoryControlledWALEntryQueue.java   | 11 +++++++
 .../db/storageengine/rescon/memory/SystemInfo.java |  4 +++
 4 files changed, 43 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7507a1f953c..7cde275ce02 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -97,6 +97,7 @@ public enum TSStatusCode {
   DISK_SPACE_INSUFFICIENT(611),
   OVERSIZE_TTL(612),
   TTL_CONFIG_ERROR(613),
+  WAL_ENTRY_TOO_LARGE(620),
 
   // Query Engine
   SQL_PARSE_ERROR(700),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bd4c11e969b..c91cb7838e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -297,11 +298,16 @@ public class TsFileProcessor {
     } catch (Exception e) {
       rollbackMemoryInfo(memIncrements);
       logger.warn("Exception during wal flush", e);
-      throw new WriteProcessException(
-          String.format(
-              "%s: %s write WAL failed: %s",
-              storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), 
e.getMessage()),
-          e);
+      if (e instanceof IoTDBRuntimeException) {
+        throw new WriteProcessException(
+            e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+      } else {
+        throw new WriteProcessException(
+            String.format(
+                "%s: %s write WAL failed: %s",
+                storageGroupName, 
tsFileResource.getTsFile().getAbsolutePath(), e.getMessage()),
+            e);
+      }
     } finally {
       // recordScheduleWalCost
       costsForMetrics[2] += System.nanoTime() - startTime;
@@ -393,11 +399,16 @@ public class TsFileProcessor {
     } catch (Exception e) {
       rollbackMemoryInfo(memIncrements);
       logger.warn("Exception during wal flush", e);
-      throw new WriteProcessException(
-          String.format(
-              "%s: %s write WAL failed: %s",
-              storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), 
e.getMessage()),
-          e);
+      if (e instanceof IoTDBRuntimeException) {
+        throw new WriteProcessException(
+            e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+      } else {
+        throw new WriteProcessException(
+            String.format(
+                "%s: %s write WAL failed: %s",
+                storageGroupName, 
tsFileResource.getTsFile().getAbsolutePath(), e.getMessage()),
+            e);
+      }
     } finally {
       // recordScheduleWalCost
       costsForMetrics[2] += System.nanoTime() - startTime;
@@ -508,7 +519,12 @@ public class TsFileProcessor {
       }
     } catch (Exception e) {
       for (int i = start; i < end; i++) {
-        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        if (e instanceof IoTDBRuntimeException) {
+          results[i] =
+              RpcUtils.getStatus(((IoTDBRuntimeException) e).getErrorCode(), 
e.getMessage());
+        } else {
+          results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
       }
       rollbackMemoryInfo(memIncrements);
       throw new WriteProcessException(e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
index ddb1310564b..e3b62bd0768 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 
@@ -26,6 +27,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.rpc.TSStatusCode.WAL_ENTRY_TOO_LARGE;
+
 public class MemoryControlledWALEntryQueue {
 
   private final BlockingQueue<WALEntry> queue;
@@ -50,6 +53,14 @@ public class MemoryControlledWALEntryQueue {
     long elementSize = getElementSize(e);
     synchronized (nonFullCondition) {
       while 
(SystemInfo.getInstance().cannotReserveMemoryForWalEntry(elementSize)) {
+        if (elementSize > 
SystemInfo.getInstance().getMemorySizeForWalBufferQueue()) {
+          throw new IoTDBRuntimeException(
+              "The element size of WALEntry "
+                  + elementSize
+                  + " is larger than the total memory size of wal buffer queue 
"
+                  + SystemInfo.getInstance().getMemorySizeForWalBufferQueue(),
+              WAL_ENTRY_TOO_LARGE.getStatusCode());
+        }
         nonFullCondition.wait();
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 56e2ef4dae6..1e692697f44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -567,4 +567,8 @@ public class SystemInfo {
   public boolean cannotReserveMemoryForWalEntry(long walEntrySize) {
     return walBufferQueueMemoryCost.get() + walEntrySize > 
memorySizeForWalBufferQueue;
   }
+
+  public long getMemorySizeForWalBufferQueue() {
+    return memorySizeForWalBufferQueue;
+  }
 }

Reply via email to