This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec193bf2f1 Throw exception when wal entry is too large
5ec193bf2f1 is described below
commit 5ec193bf2f17a751391dc21332e484226abbf566
Author: Haonan <[email protected]>
AuthorDate: Tue Apr 15 16:53:07 2025 +0800
Throw exception when wal entry is too large
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../dataregion/memtable/TsFileProcessor.java | 38 +++++++++++++++-------
.../wal/utils/MemoryControlledWALEntryQueue.java | 14 ++++++++
3 files changed, 42 insertions(+), 11 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6fb9cdcf7a3..0c732282bcf 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -111,6 +111,7 @@ public enum TSStatusCode {
DATA_TYPE_MISMATCH(614),
COLUMN_CATEGORY_MISMATCH(615),
COLUMN_NOT_EXISTS(616),
+ WAL_ENTRY_TOO_LARGE(620),
// Query Engine
SQL_PARSE_ERROR(700),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index d56868671ad..861b72c923a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.IFullPath;
@@ -303,11 +304,16 @@ public class TsFileProcessor {
} catch (Exception e) {
rollbackMemoryInfo(memIncrements);
logger.warn("Exception during wal flush", e);
- throw new WriteProcessException(
- String.format(
- "%s: %s write WAL failed: %s",
- dataRegionName, tsFileResource.getTsFile().getAbsolutePath(),
e.getMessage()),
- e);
+ if (e instanceof IoTDBRuntimeException) {
+ throw new WriteProcessException(
+ e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+ } else {
+ throw new WriteProcessException(
+ String.format(
+ "%s: %s write WAL failed: %s",
+ dataRegionName, tsFileResource.getTsFile().getAbsolutePath(),
e.getMessage()),
+ e);
+ }
} finally {
// recordScheduleWalCost
infoForMetrics[2] += System.nanoTime() - startTime;
@@ -394,11 +400,16 @@ public class TsFileProcessor {
} catch (Exception e) {
rollbackMemoryInfo(memIncrements);
logger.warn("Exception during wal flush", e);
- throw new WriteProcessException(
- String.format(
- "%s: %s write WAL failed: %s",
- dataRegionName, tsFileResource.getTsFile().getAbsolutePath(),
e.getMessage()),
- e);
+ if (e instanceof IoTDBRuntimeException) {
+ throw new WriteProcessException(
+ e.getMessage(), ((IoTDBRuntimeException) e).getErrorCode(), true);
+ } else {
+ throw new WriteProcessException(
+ String.format(
+ "%s: %s write WAL failed: %s",
+ dataRegionName, tsFileResource.getTsFile().getAbsolutePath(),
e.getMessage()),
+ e);
+ }
} finally {
// recordScheduleWalCost
infoForMetrics[2] += System.nanoTime() - startTime;
@@ -561,7 +572,12 @@ public class TsFileProcessor {
int start = rangePair[0];
int end = rangePair[1];
for (int i = start; i < end; i++) {
- results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ if (e instanceof IoTDBRuntimeException) {
+ results[i] =
+ RpcUtils.getStatus(((IoTDBRuntimeException) e).getErrorCode(),
e.getMessage());
+ } else {
+ results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
}
}
rollbackMemoryInfo(memIncrements);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
index 961d1dd97c3..2bd56a1a4fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -26,6 +27,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.rpc.TSStatusCode.WAL_ENTRY_TOO_LARGE;
+
public class MemoryControlledWALEntryQueue {
private final BlockingQueue<WALEntry> queue;
private static final Object nonFullCondition = new Object();
@@ -49,6 +52,17 @@ public class MemoryControlledWALEntryQueue {
long elementSize = getElementSize(e);
synchronized (nonFullCondition) {
while
(!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize))
{
+ if (elementSize
+ >
SystemInfo.getInstance().getWalBufferQueueMemoryBlock().getTotalMemorySizeInBytes())
{
+ throw new IoTDBRuntimeException(
+ "The element size of WALEntry "
+ + elementSize
+ + " is larger than the total memory size of wal buffer queue
"
+ + SystemInfo.getInstance()
+ .getWalBufferQueueMemoryBlock()
+ .getTotalMemorySizeInBytes(),
+ WAL_ENTRY_TOO_LARGE.getStatusCode());
+ }
nonFullCondition.wait();
}
}