This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch wal-compress in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d4cbae9f07b0cc6131b2f468f189410e745c5dd Author: LiuXuxin <[email protected]> AuthorDate: Thu Mar 28 22:46:06 2024 +0800 enable wal compression remove metrics in mem table flush task, cache hash code in partial path, use gzip to compress wal batch update metrics --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 9 ++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 + .../planner/plan/node/write/InsertRowNode.java | 2 + .../planner/plan/node/write/InsertRowsNode.java | 5 + .../scheduler/FragmentInstanceDispatcherImpl.java | 19 ++++ .../db/storageengine/dataregion/DataRegion.java | 31 ++++++- .../dataregion/flush/MemTableFlushTask.java | 3 - .../dataregion/memtable/AbstractMemTable.java | 13 +-- .../dataregion/wal/io/CheckpointReader.java | 3 +- .../storageengine/dataregion/wal/io/LogWriter.java | 36 +++++++ .../dataregion/wal/io/WALByteBufReader.java | 7 +- .../dataregion/wal/io/WALInputStream.java | 103 +++++++++++++++++++++ .../storageengine/dataregion/wal/io/WALReader.java | 6 +- .../storageengine/dataregion/wal/io/WALWriter.java | 2 + .../dataregion/wal/utils/WALWriteUtils.java | 10 +- .../org/apache/iotdb/commons/path/PartialPath.java | 16 +++- 16 files changed, 234 insertions(+), 36 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 1380a167505..ad180cc7668 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -94,6 +94,7 @@ public class IoTDBConfig { "([" + PATH_SEPARATOR + "])?" + NODE_NAME_MATCHER + "(" + PARTIAL_NODE_MATCHER + ")*"; public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER); + boolean enableWALCompression = true; /** Whether to enable the mqtt service. */ private boolean enableMQTTService = false; @@ -3829,4 +3830,12 @@ public class IoTDBConfig { double innerCompactionTaskSelectionDiskRedundancy) { this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy; } + + public boolean isEnableWALCompression() { + return enableWALCompression; + } + + public void setEnableWALCompression(boolean enableWALCompression) { + this.enableWALCompression = enableWALCompression; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 139a1374b44..3b71dc7d27c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -412,6 +412,11 @@ public class IoTDBDescriptor { "io_task_queue_size_for_flushing", Integer.toString(conf.getIoTaskQueueSizeForFlushing())))); + conf.setEnableWALCompression( + Boolean.parseBoolean( + properties.getProperty( + "enable_wal_compression", Boolean.toString(conf.isEnableWALCompression())))); + conf.setCompactionScheduleIntervalInMs( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index b0d8aad94c3..2bc63523010 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -52,6 +52,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public class InsertRowNode extends InsertNode implements WALEntryValue { @@ -67,6 +68,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { private Object[] values; private boolean isNeedInferType = false; + public AtomicInteger insertCount; public InsertRowNode(PlanNodeId id) { super(id); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 12e229470be..a2950e55894 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -42,6 +42,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class InsertRowsNode extends InsertNode { @@ -60,6 +62,9 @@ public class InsertRowsNode extends InsertNode { /** the InsertRowsNode list */ private List<InsertRowNode> insertRowNodeList; + public AtomicInteger insertCount = new AtomicInteger(0); + public AtomicLong[] metrics; + public InsertRowsNode(PlanNodeId id) { super(id); insertRowNodeList = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index f671a87a1f9..2d573c00ee0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; @@ -60,6 +61,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; @@ -419,6 +421,13 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { break; case WRITE: PlanNode planNode = instance.getFragment().getPlanNodeTree(); + if (planNode instanceof InsertRowsNode) { + InsertRowsNode insertRowsNode = (InsertRowsNode) planNode; + insertRowsNode.metrics = new AtomicLong[4]; + for (int i = 0; i < 4; i++) { + insertRowsNode.metrics[i] = new AtomicLong(0); + } + } RegionWriteExecutor writeExecutor = new RegionWriteExecutor(); RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode); if (!writeResult.isAccepted()) { @@ -438,6 +447,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } else { // some expected and accepted status except SUCCESS_STATUS need to be returned TSStatus status = writeResult.getStatus(); + if (planNode instanceof InsertRowsNode) { + InsertRowsNode insertRowsNode = (InsertRowsNode) planNode; + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost( + insertRowsNode.metrics[0].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost( + insertRowsNode.metrics[1].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(insertRowsNode.metrics[2].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost( + insertRowsNode.metrics[3].get()); + } if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new FragmentInstanceDispatchException(status); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 469eeae7ad6..81a35ce869f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -30,6 +30,8 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -104,6 +106,7 @@ import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.DateTimeUtils; +import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -1176,6 +1179,7 @@ public class DataRegion implements IDataRegionForQuery { if (insertRowNode.allMeasurementFailed()) { continue; } + insertRowNode.insertCount = insertRowsNode.insertCount; TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]); if (tsFileProcessor == null) { @@ -1197,10 +1201,29 @@ public class DataRegion implements IDataRegionForQuery { } } - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + MetricService.getInstance() + .count( + insertRowsNode.insertCount.get(), + Metric.QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegionId); + + if (insertRowsNode.metrics != null) { + insertRowsNode.metrics[0].addAndGet(costsForMetrics[0]); + insertRowsNode.metrics[1].addAndGet(costsForMetrics[1]); + insertRowsNode.metrics[2].addAndGet(costsForMetrics[2]); + insertRowsNode.metrics[3].addAndGet(costsForMetrics[3]); + } else { + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + } if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index fe4fe1eec04..ce8e2929f38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -156,7 +156,6 @@ public class MemTableFlushTask { series.sortTvListForFlush(); long subTaskTime = System.currentTimeMillis() - startTime; sortTime += subTaskTime; - WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime); encodingTaskQueue.put(series); } @@ -258,7 +257,6 @@ public class MemTableFlushTask { Thread.currentThread().interrupt(); } long subTaskTime = System.currentTimeMillis() - starTime; - WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; } } @@ -344,7 +342,6 @@ public class MemTableFlushTask { } long subTaskTime = System.currentTimeMillis() - starTime; ioTime += subTaskTime; - WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime); } LOGGER.debug( "flushing a memtable to file {} in database {}, io cost {}ms", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index f8d7f6a53d7..58004349ce6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -204,18 +204,7 @@ public abstract class AbstractMemTable implements IMemTable { - nullPointsNumber; totalPointsNum += pointsInserted; - - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId); + insertRowNode.insertCount.addAndGet(pointsInserted); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java index 5d2bad0a874..081b3ed4a4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java @@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -48,7 +47,7 @@ public class CheckpointReader { private void init() { checkpoints = new ArrayList<>(); try (DataInputStream logStream = - new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) { + new DataInputStream(new BufferedInputStream(new WALInputStream(logFile)))) { maxMemTableId = logStream.readLong(); while (logStream.available() > 0) { Checkpoint checkpoint = Checkpoint.deserialize(logStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 68f4deae318..c3fe218fb40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -19,8 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,18 +47,51 @@ public abstract class LogWriter implements ILogWriter { protected final FileOutputStream logStream; protected final FileChannel logChannel; protected long size; + protected boolean isEndFile = false; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1); + private final ICompressor compressor = ICompressor.getCompressor(CompressionType.GZIP); + private final ByteBuffer compressedByteBuffer; protected LogWriter(File logFile) throws FileNotFoundException { this.logFile = logFile; this.logStream = new FileOutputStream(logFile, true); this.logChannel = this.logStream.getChannel(); + if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) { + compressedByteBuffer = + ByteBuffer.allocate( + compressor.getMaxBytesForCompression( + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize())); + } else { + compressedByteBuffer = null; + } } @Override public void write(ByteBuffer buffer) throws IOException { + int bufferSize = buffer.position(); size += buffer.position(); buffer.flip(); + boolean compressed = false; + int uncompressedSize = bufferSize; + if (!isEndFile && IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression() + /* && bufferSize > 1024 * 512 Do not compress buffer that is less than 512KB */ ) { + compressedByteBuffer.clear(); + compressor.compress(buffer, compressedByteBuffer); + buffer = compressedByteBuffer; + bufferSize = buffer.position(); + buffer.flip(); + compressed = true; + } + size += bufferSize; + headerBuffer.clear(); + headerBuffer.putInt(bufferSize); + headerBuffer.put((byte) (compressed ? 1 : 0)); try { + if (compressed) { + headerBuffer.putInt(uncompressedSize); + } + headerBuffer.flip(); + logChannel.write(headerBuffer); logChannel.write(buffer); } catch (ClosedChannelException e) { logger.warn("Cannot write to {}", logFile, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java index f101eaf3647..ad3b7479de9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import java.io.Closeable; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,6 +38,7 @@ public class WALByteBufReader implements Closeable { private final File logFile; private final FileChannel channel; private final WALMetaData metaData; + private final DataInputStream logStream; private final Iterator<Integer> sizeIterator; public WALByteBufReader(File logFile) throws IOException { @@ -46,6 +48,7 @@ public class WALByteBufReader implements Closeable { public WALByteBufReader(File logFile, FileChannel channel) throws IOException { this.logFile = logFile; this.channel = channel; + this.logStream = new DataInputStream(new WALInputStream(logFile)); this.metaData = WALMetaData.readFromWALFile(logFile, channel); this.sizeIterator = metaData.getBuffersSize().iterator(); channel.position(0); @@ -64,8 +67,8 @@ public class WALByteBufReader implements Closeable { public ByteBuffer next() throws IOException { int size = sizeIterator.next(); ByteBuffer buffer = ByteBuffer.allocate(size); - channel.read(buffer); - buffer.clear(); + logStream.readFully(buffer.array(), 0, size); + buffer.flip(); return buffer; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java new file mode 100644 index 00000000000..8e742b3cb1b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.wal.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Objects; + +public class WALInputStream extends InputStream implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(WALInputStream.class); + private final FileChannel channel; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1); + private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES); + private ByteBuffer dataBuffer = + ByteBuffer.allocate( + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); // uncompressed data buffer + + public WALInputStream(File logFile) throws IOException { + channel = FileChannel.open(logFile.toPath()); + } + + @Override + public int read() throws IOException { + if (Objects.isNull(dataBuffer) || dataBuffer.position() == dataBuffer.limit()) { + loadNextSegment(); + } + return dataBuffer.get() & 0xFF; + } + + @Override + public void close() throws IOException { + channel.close(); + dataBuffer = null; + } + + @Override + public int available() throws IOException { + return (int) (channel.size() - channel.position()); + } + + private void loadNextSegment() throws IOException { + headerBuffer.clear(); + if (channel.read(headerBuffer) != Integer.BYTES + 1) { + throw new IOException("Unexpected end of file"); + } + headerBuffer.flip(); + int dataSize = headerBuffer.getInt(); + boolean isCompressed = headerBuffer.get() == 1; + if (isCompressed) { + compressedHeader.clear(); + if (channel.read(compressedHeader) != Integer.BYTES) { + throw new IOException("Unexpected end of file"); + } + compressedHeader.flip(); + int uncompressedSize = compressedHeader.getInt(); + if (uncompressedSize > dataBuffer.capacity()) { + // enlarge buffer + dataBuffer = ByteBuffer.allocateDirect(uncompressedSize); + } + ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize); + if (channel.read(compressedData) != dataSize) { + throw new IOException("Unexpected end of file"); + } + compressedData.flip(); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.LZ4); + dataBuffer.clear(); + unCompressor.uncompress(compressedData, dataBuffer); + } else { + dataBuffer = ByteBuffer.allocateDirect(dataSize); + if (channel.read(dataBuffer) != dataSize) { + throw new IOException("Unexpected end of file"); + } + } + dataBuffer.flip(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java index ee50c73df97..475ea2b0b2d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java @@ -26,12 +26,10 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.Iterator; import java.util.NoSuchElementException; @@ -57,9 +55,7 @@ public class WALReader implements Closeable { public WALReader(File logFile, boolean fileMayCorrupt) throws IOException { this.logFile = logFile; this.fileMayCorrupt = fileMayCorrupt; - this.logStream = - new DataInputStream( - new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE)); + this.logStream = new DataInputStream(new WALInputStream(logFile)); } /** Like {@link Iterator#hasNext()}. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java index 425fc676fad..20ae9975450 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java @@ -59,6 +59,7 @@ public class WALWriter extends LogWriter { } private void endFile() throws IOException { + this.isEndFile = true; WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER); int metaDataSize = metaData.serializedSize(); ByteBuffer buffer = @@ -72,6 +73,7 @@ public class WALWriter extends LogWriter { // add magic string buffer.put(MAGIC_STRING.getBytes()); write(buffer); + this.isEndFile = false; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java index d5702e7004a..633a8153b66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java @@ -126,10 +126,12 @@ public class WALWriteUtils { return write(NO_BYTE_TO_READ, buffer); } int len = 0; - byte[] bytes = s.getBytes(); - len += write(bytes.length, buffer); - buffer.put(bytes); - len += bytes.length; + len += write(s.length(), buffer); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + buffer.put((byte) c); // ascii only + } + len += s.length(); return len; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index 217fd3ed1aa..60845c77fb9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java @@ -58,6 +58,8 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { private static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new String[] {"root", "**"}); protected String[] nodes; + protected int hashCache; + protected boolean cacheHashCache = false; public PartialPath() {} @@ -713,11 +715,17 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { @Override public int hashCode() { - int h = 0; - for (String node : nodes) { - h += 31 * h + node.hashCode(); + if (cacheHashCache) { + return hashCache; + } else { + int h = 0; + for (String node : nodes) { + h += 31 * h + node.hashCode(); + } + hashCache = h; + cacheHashCache = true; + return h; } - return h; } @Override
