This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 459f5e84a4f [To dev/1.3] Throw exception when wal entry is too large
459f5e84a4f is described below
commit 459f5e84a4fd3dd8a02d72f3145becb97d88db34
Author: Haonan <[email protected]>
AuthorDate: Tue Apr 15 18:27:53 2025 +0800
[To dev/1.3] Throw exception when wal entry is too large
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../dataregion/memtable/TsFileProcessor.java | 38 +++++++++++++++-------
.../wal/utils/MemoryControlledWALEntryQueue.java | 11 +++++++
.../db/storageengine/rescon/memory/SystemInfo.java | 4 +++
4 files changed, 43 insertions(+), 11 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7507a1f953c..7cde275ce02 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -97,6 +97,7 @@ public enum TSStatusCode {
DISK_SPACE_INSUFFICIENT(611),
OVERSIZE_TTL(612),
TTL_CONFIG_ERROR(613),
+ WAL_ENTRY_TOO_LARGE(620),
// Query Engine
SQL_PARSE_ERROR(700),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bd4c11e969b..c91cb7838e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
@@ -297,11 +298,16 @@ public class TsFileProcessor {
} catch (Exception e) {
rollbackMemoryInfo(memIncrements);
logger.warn("Exception during wal flush", e);
- throw new WriteProcessException(
- String.format(
- "%s: %s write WAL failed: %s",
- storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
e.getMessage()),
- e);
+ if (e instanceof IoTDBRuntimeException) {
+ throw new WriteProcessException(
+ e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+ } else {
+ throw new WriteProcessException(
+ String.format(
+ "%s: %s write WAL failed: %s",
+ storageGroupName,
tsFileResource.getTsFile().getAbsolutePath(), e.getMessage()),
+ e);
+ }
} finally {
// recordScheduleWalCost
costsForMetrics[2] += System.nanoTime() - startTime;
@@ -393,11 +399,16 @@ public class TsFileProcessor {
} catch (Exception e) {
rollbackMemoryInfo(memIncrements);
logger.warn("Exception during wal flush", e);
- throw new WriteProcessException(
- String.format(
- "%s: %s write WAL failed: %s",
- storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
e.getMessage()),
- e);
+ if (e instanceof IoTDBRuntimeException) {
+ throw new WriteProcessException(
+ e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+ } else {
+ throw new WriteProcessException(
+ String.format(
+ "%s: %s write WAL failed: %s",
+ storageGroupName,
tsFileResource.getTsFile().getAbsolutePath(), e.getMessage()),
+ e);
+ }
} finally {
// recordScheduleWalCost
costsForMetrics[2] += System.nanoTime() - startTime;
@@ -508,7 +519,12 @@ public class TsFileProcessor {
}
} catch (Exception e) {
for (int i = start; i < end; i++) {
- results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ if (e instanceof IoTDBRuntimeException) {
+ results[i] =
+ RpcUtils.getStatus(((IoTDBRuntimeException) e).getErrorCode(),
e.getMessage());
+ } else {
+ results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
}
rollbackMemoryInfo(memIncrements);
throw new WriteProcessException(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
index ddb1310564b..e3b62bd0768 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -26,6 +27,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.rpc.TSStatusCode.WAL_ENTRY_TOO_LARGE;
+
public class MemoryControlledWALEntryQueue {
private final BlockingQueue<WALEntry> queue;
@@ -50,6 +53,14 @@ public class MemoryControlledWALEntryQueue {
long elementSize = getElementSize(e);
synchronized (nonFullCondition) {
while
(SystemInfo.getInstance().cannotReserveMemoryForWalEntry(elementSize)) {
+ if (elementSize >
SystemInfo.getInstance().getMemorySizeForWalBufferQueue()) {
+ throw new IoTDBRuntimeException(
+ "The element size of WALEntry "
+ + elementSize
+ + " is larger than the total memory size of wal buffer queue
"
+ + SystemInfo.getInstance().getMemorySizeForWalBufferQueue(),
+ WAL_ENTRY_TOO_LARGE.getStatusCode());
+ }
nonFullCondition.wait();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 56e2ef4dae6..1e692697f44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -567,4 +567,8 @@ public class SystemInfo {
public boolean cannotReserveMemoryForWalEntry(long walEntrySize) {
return walBufferQueueMemoryCost.get() + walEntrySize >
memorySizeForWalBufferQueue;
}
+
+ public long getMemorySizeForWalBufferQueue() {
+ return memorySizeForWalBufferQueue;
+ }
}