This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch wal-compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d4cbae9f07b0cc6131b2f468f189410e745c5dd
Author: LiuXuxin <[email protected]>
AuthorDate: Thu Mar 28 22:46:06 2024 +0800

    enable wal compression
    
    remove metrics in mem table flush task, cache hash code in partial path, 
use gzip to compress wal
    
    batch update metrics
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   9 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../planner/plan/node/write/InsertRowNode.java     |   2 +
 .../planner/plan/node/write/InsertRowsNode.java    |   5 +
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  19 ++++
 .../db/storageengine/dataregion/DataRegion.java    |  31 ++++++-
 .../dataregion/flush/MemTableFlushTask.java        |   3 -
 .../dataregion/memtable/AbstractMemTable.java      |  13 +--
 .../dataregion/wal/io/CheckpointReader.java        |   3 +-
 .../storageengine/dataregion/wal/io/LogWriter.java |  36 +++++++
 .../dataregion/wal/io/WALByteBufReader.java        |   7 +-
 .../dataregion/wal/io/WALInputStream.java          | 103 +++++++++++++++++++++
 .../storageengine/dataregion/wal/io/WALReader.java |   6 +-
 .../storageengine/dataregion/wal/io/WALWriter.java |   2 +
 .../dataregion/wal/utils/WALWriteUtils.java        |  10 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  16 +++-
 16 files changed, 234 insertions(+), 36 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1380a167505..ad180cc7668 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -94,6 +94,7 @@ public class IoTDBConfig {
       "([" + PATH_SEPARATOR + "])?" + NODE_NAME_MATCHER + "(" + 
PARTIAL_NODE_MATCHER + ")*";
 
   public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
+  boolean enableWALCompression = true;
 
   /** Whether to enable the mqtt service. */
   private boolean enableMQTTService = false;
@@ -3829,4 +3830,12 @@ public class IoTDBConfig {
       double innerCompactionTaskSelectionDiskRedundancy) {
     this.innerCompactionTaskSelectionDiskRedundancy = 
innerCompactionTaskSelectionDiskRedundancy;
   }
+
+  public boolean isEnableWALCompression() {
+    return enableWALCompression;
+  }
+
+  public void setEnableWALCompression(boolean enableWALCompression) {
+    this.enableWALCompression = enableWALCompression;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 139a1374b44..3b71dc7d27c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,6 +412,11 @@ public class IoTDBDescriptor {
                 "io_task_queue_size_for_flushing",
                 Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
 
+    conf.setEnableWALCompression(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_wal_compression", 
Boolean.toString(conf.isEnableWALCompression()))));
+
     conf.setCompactionScheduleIntervalInMs(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index b0d8aad94c3..2bc63523010 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -52,6 +52,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class InsertRowNode extends InsertNode implements WALEntryValue {
 
@@ -67,6 +68,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   private Object[] values;
 
   private boolean isNeedInferType = false;
+  public AtomicInteger insertCount;
 
   public InsertRowNode(PlanNodeId id) {
     super(id);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 12e229470be..a2950e55894 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -42,6 +42,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class InsertRowsNode extends InsertNode {
 
@@ -60,6 +62,9 @@ public class InsertRowsNode extends InsertNode {
   /** the InsertRowsNode list */
   private List<InsertRowNode> insertRowNodeList;
 
+  public AtomicInteger insertCount = new AtomicInteger(0);
+  public AtomicLong[] metrics;
+
   public InsertRowsNode(PlanNodeId id) {
     super(id);
     insertRowNodeList = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f671a87a1f9..2d573c00ee0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -60,6 +61,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -419,6 +421,13 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         break;
       case WRITE:
         PlanNode planNode = instance.getFragment().getPlanNodeTree();
+        if (planNode instanceof InsertRowsNode) {
+          InsertRowsNode insertRowsNode = (InsertRowsNode) planNode;
+          insertRowsNode.metrics = new AtomicLong[4];
+          for (int i = 0; i < 4; i++) {
+            insertRowsNode.metrics[i] = new AtomicLong(0);
+          }
+        }
         RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
         RegionExecutionResult writeResult = writeExecutor.execute(groupId, 
planNode);
         if (!writeResult.isAccepted()) {
@@ -438,6 +447,16 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         } else {
           // some expected and accepted status except SUCCESS_STATUS need to 
be returned
           TSStatus status = writeResult.getStatus();
+          if (planNode instanceof InsertRowsNode) {
+            InsertRowsNode insertRowsNode = (InsertRowsNode) planNode;
+            PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(
+                insertRowsNode.metrics[0].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(
+                insertRowsNode.metrics[1].get());
+            
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(insertRowsNode.metrics[2].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(
+                insertRowsNode.metrics[3].get());
+          }
           if (status != null && status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             throw new FragmentInstanceDispatchException(status);
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 469eeae7ad6..81a35ce869f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -104,6 +106,7 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -1176,6 +1179,7 @@ public class DataRegion implements IDataRegionForQuery {
       if (insertRowNode.allMeasurementFailed()) {
         continue;
       }
+      insertRowNode.insertCount = insertRowsNode.insertCount;
       TsFileProcessor tsFileProcessor =
           getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
       if (tsFileProcessor == null) {
@@ -1197,10 +1201,29 @@ public class DataRegion implements IDataRegionForQuery {
       }
     }
 
-    
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
-    
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
-    
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
+    MetricService.getInstance()
+        .count(
+            insertRowsNode.insertCount.get(),
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            Metric.POINTS_IN.toString(),
+            Tag.DATABASE.toString(),
+            databaseName,
+            Tag.REGION.toString(),
+            dataRegionId);
+
+    if (insertRowsNode.metrics != null) {
+      insertRowsNode.metrics[0].addAndGet(costsForMetrics[0]);
+      insertRowsNode.metrics[1].addAndGet(costsForMetrics[1]);
+      insertRowsNode.metrics[2].addAndGet(costsForMetrics[2]);
+      insertRowsNode.metrics[3].addAndGet(costsForMetrics[3]);
+    } else {
+      
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
+      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
+      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
+    }
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       if 
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index fe4fe1eec04..ce8e2929f38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -156,7 +156,6 @@ public class MemTableFlushTask {
         series.sortTvListForFlush();
         long subTaskTime = System.currentTimeMillis() - startTime;
         sortTime += subTaskTime;
-        WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, 
subTaskTime);
         encodingTaskQueue.put(series);
       }
 
@@ -258,7 +257,6 @@ public class MemTableFlushTask {
                 Thread.currentThread().interrupt();
               }
               long subTaskTime = System.currentTimeMillis() - starTime;
-              
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
               memSerializeTime += subTaskTime;
             }
           }
@@ -344,7 +342,6 @@ public class MemTableFlushTask {
           }
           long subTaskTime = System.currentTimeMillis() - starTime;
           ioTime += subTaskTime;
-          WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, 
subTaskTime);
         }
         LOGGER.debug(
             "flushing a memtable to file {} in database {}, io cost {}ms",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index f8d7f6a53d7..58004349ce6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -204,18 +204,7 @@ public abstract class AbstractMemTable implements 
IMemTable {
             - nullPointsNumber;
 
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId);
+    insertRowNode.insertCount.addAndGet(pointsInserted);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0a874..081b3ed4a4f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,7 +47,7 @@ public class CheckpointReader {
   private void init() {
     checkpoints = new ArrayList<>();
     try (DataInputStream logStream =
-        new DataInputStream(new BufferedInputStream(new 
FileInputStream(logFile)))) {
+        new DataInputStream(new BufferedInputStream(new 
WALInputStream(logFile)))) {
       maxMemTableId = logStream.readLong();
       while (logStream.available() > 0) {
         Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4deae318..c3fe218fb40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,18 +47,51 @@ public abstract class LogWriter implements ILogWriter {
   protected final FileOutputStream logStream;
   protected final FileChannel logChannel;
   protected long size;
+  protected boolean isEndFile = false;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 
2 + 1);
+  private final ICompressor compressor = 
ICompressor.getCompressor(CompressionType.GZIP);
+  private final ByteBuffer compressedByteBuffer;
 
   protected LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
     this.logStream = new FileOutputStream(logFile, true);
     this.logChannel = this.logStream.getChannel();
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) {
+      compressedByteBuffer =
+          ByteBuffer.allocate(
+              compressor.getMaxBytesForCompression(
+                  
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+    } else {
+      compressedByteBuffer = null;
+    }
   }
 
   @Override
   public void write(ByteBuffer buffer) throws IOException {
+    int bufferSize = buffer.position();
     size += buffer.position();
     buffer.flip();
+    boolean compressed = false;
+    int uncompressedSize = bufferSize;
+    if (!isEndFile && 
IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
+    /* && bufferSize > 1024 * 512 Do not compress buffer that is less than 
512KB */ ) {
+      compressedByteBuffer.clear();
+      compressor.compress(buffer, compressedByteBuffer);
+      buffer = compressedByteBuffer;
+      bufferSize = buffer.position();
+      buffer.flip();
+      compressed = true;
+    }
+    size += bufferSize;
+    headerBuffer.clear();
+    headerBuffer.putInt(bufferSize);
+    headerBuffer.put((byte) (compressed ? 1 : 0));
     try {
+      if (compressed) {
+        headerBuffer.putInt(uncompressedSize);
+      }
+      headerBuffer.flip();
+      logChannel.write(headerBuffer);
       logChannel.write(buffer);
     } catch (ClosedChannelException e) {
       logger.warn("Cannot write to {}", logFile, e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index f101eaf3647..ad3b7479de9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 
 import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ public class WALByteBufReader implements Closeable {
   private final File logFile;
   private final FileChannel channel;
   private final WALMetaData metaData;
+  private final DataInputStream logStream;
   private final Iterator<Integer> sizeIterator;
 
   public WALByteBufReader(File logFile) throws IOException {
@@ -46,6 +48,7 @@ public class WALByteBufReader implements Closeable {
   public WALByteBufReader(File logFile, FileChannel channel) throws 
IOException {
     this.logFile = logFile;
     this.channel = channel;
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
     this.metaData = WALMetaData.readFromWALFile(logFile, channel);
     this.sizeIterator = metaData.getBuffersSize().iterator();
     channel.position(0);
@@ -64,8 +67,8 @@ public class WALByteBufReader implements Closeable {
   public ByteBuffer next() throws IOException {
     int size = sizeIterator.next();
     ByteBuffer buffer = ByteBuffer.allocate(size);
-    channel.read(buffer);
-    buffer.clear();
+    logStream.readFully(buffer.array(), 0, size);
+    buffer.flip();
     return buffer;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 00000000000..8e742b3cb1b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(WALInputStream.class);
+  private final FileChannel channel;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 
1);
+  private final ByteBuffer compressedHeader = 
ByteBuffer.allocate(Integer.BYTES);
+  private ByteBuffer dataBuffer =
+      ByteBuffer.allocate(
+          IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); // 
uncompressed data buffer
+
+  public WALInputStream(File logFile) throws IOException {
+    channel = FileChannel.open(logFile.toPath());
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (Objects.isNull(dataBuffer) || dataBuffer.position() == 
dataBuffer.limit()) {
+      loadNextSegment();
+    }
+    return dataBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+    dataBuffer = null;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) (channel.size() - channel.position());
+  }
+
+  private void loadNextSegment() throws IOException {
+    headerBuffer.clear();
+    if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+      throw new IOException("Unexpected end of file");
+    }
+    headerBuffer.flip();
+    int dataSize = headerBuffer.getInt();
+    boolean isCompressed = headerBuffer.get() == 1;
+    if (isCompressed) {
+      compressedHeader.clear();
+      if (channel.read(compressedHeader) != Integer.BYTES) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedHeader.flip();
+      int uncompressedSize = compressedHeader.getInt();
+      if (uncompressedSize > dataBuffer.capacity()) {
+        // enlarge buffer
+        dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+      }
+      ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(compressedData) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedData.flip();
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+      dataBuffer.clear();
+      unCompressor.uncompress(compressedData, dataBuffer);
+    } else {
+      dataBuffer = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(dataBuffer) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+    }
+    dataBuffer.flip();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73df97..475ea2b0b2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -57,9 +55,7 @@ public class WALReader implements Closeable {
   public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
     this.logFile = logFile;
     this.fileMayCorrupt = fileMayCorrupt;
-    this.logStream =
-        new DataInputStream(
-            new BufferedInputStream(Files.newInputStream(logFile.toPath()), 
STREAM_BUFFER_SIZE));
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
   }
 
   /** Like {@link Iterator#hasNext()}. */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 425fc676fad..20ae9975450 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -59,6 +59,7 @@ public class WALWriter extends LogWriter {
   }
 
   private void endFile() throws IOException {
+    this.isEndFile = true;
     WALSignalEntry endMarker = new 
WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
     int metaDataSize = metaData.serializedSize();
     ByteBuffer buffer =
@@ -72,6 +73,7 @@ public class WALWriter extends LogWriter {
     // add magic string
     buffer.put(MAGIC_STRING.getBytes());
     write(buffer);
+    this.isEndFile = false;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
index d5702e7004a..633a8153b66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
@@ -126,10 +126,12 @@ public class WALWriteUtils {
       return write(NO_BYTE_TO_READ, buffer);
     }
     int len = 0;
-    byte[] bytes = s.getBytes();
-    len += write(bytes.length, buffer);
-    buffer.put(bytes);
-    len += bytes.length;
+    len += write(s.length(), buffer);
+    for (int i = 0; i < s.length(); i++) {
+      char c = s.charAt(i);
+      buffer.put((byte) c); // ascii only
+    }
+    len += s.length();
     return len;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 217fd3ed1aa..60845c77fb9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -58,6 +58,8 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
   private static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new 
String[] {"root", "**"});
 
   protected String[] nodes;
+  protected int hashCache;
+  protected boolean cacheHashCache = false;
 
   public PartialPath() {}
 
@@ -713,11 +715,17 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
 
   @Override
   public int hashCode() {
-    int h = 0;
-    for (String node : nodes) {
-      h += 31 * h + node.hashCode();
+    if (cacheHashCache) {
+      return hashCache;
+    } else {
+      int h = 0;
+      for (String node : nodes) {
+        h += 31 * h + node.hashCode();
+      }
+      hashCache = h;
+      cacheHashCache = true;
+      return h;
     }
-    return h;
   }
 
   @Override

Reply via email to