This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 101ca41e037 fix error log caused by ClosedByInterruptException in
compaction module
101ca41e037 is described below
commit 101ca41e03753c21be6ad0c86856ed1d47796944
Author: shuwenwei <[email protected]>
AuthorDate: Wed Mar 13 14:41:43 2024 +0800
fix error log caused by ClosedByInterruptException in compaction module
---
.../execute/task/InnerSpaceCompactionTask.java | 6 +-
.../compaction/io/CompactionTsFileInput.java | 104 +++++++++++++++++++++
.../compaction/io/CompactionTsFileReader.java | 1 +
.../compaction/repair/RepairDataFileScanUtil.java | 8 +-
.../compaction/schedule/CompactionScheduler.java | 6 +-
.../estimator/AbstractCompactionEstimator.java | 5 +-
.../estimator/AbstractCrossSpaceEstimator.java | 8 ++
.../estimator/AbstractInnerSpaceEstimator.java | 13 +++
.../impl/RewriteCrossSpaceCompactionSelector.java | 6 +-
.../StopReadTsFileByInterruptException.java | 24 +++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 37 ++++++++
.../iotdb/tsfile/read/reader/LocalTsFileInput.java | 6 +-
12 files changed, 208 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index d5d709a4df5..d780dbfae48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -40,12 +40,12 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
import java.io.File;
import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
@@ -486,8 +486,8 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if (innerSpaceEstimator != null && memoryCost == 0L) {
try {
memoryCost =
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
- } catch (IOException e) {
- if (e instanceof ClosedByInterruptException || Thread.interrupted()) {
+ } catch (Exception e) {
+ if (e instanceof StopReadTsFileByInterruptException ||
Thread.interrupted()) {
Thread.currentThread().interrupt();
return -1;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
new file mode 100644
index 00000000000..800e8facf30
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+
+import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class CompactionTsFileInput implements TsFileInput {
+ private final TsFileInput tsFileInput;
+
+ public CompactionTsFileInput(TsFileInput tsFileInput) {
+ this.tsFileInput = tsFileInput;
+ }
+
+ @Override
+ public long size() throws IOException {
+ try {
+ return tsFileInput.size();
+ } catch (Exception e) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new StopReadTsFileByInterruptException();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public long position() throws IOException {
+ try {
+ return tsFileInput.position();
+ } catch (Exception e) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new StopReadTsFileByInterruptException();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public TsFileInput position(long newPosition) throws IOException {
+ try {
+ return tsFileInput.position(newPosition);
+ } catch (Exception e) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new StopReadTsFileByInterruptException();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int readSize = tsFileInput.read(dst);
+ if (Thread.currentThread().isInterrupted()) {
+ throw new StopReadTsFileByInterruptException();
+ }
+ return readSize;
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) throws IOException {
+ int readSize = tsFileInput.read(dst, position);
+ if (Thread.currentThread().isInterrupted()) {
+ throw new StopReadTsFileByInterruptException();
+ }
+ return readSize;
+ }
+
+ @Override
+ public InputStream wrapAsInputStream() throws IOException {
+ return tsFileInput.wrapAsInputStream();
+ }
+
+ @Override
+ public void close() throws IOException {
+ tsFileInput.close();
+ }
+
+ @Override
+ public String getFilePath() {
+ return tsFileInput.getFilePath();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 3795904ba57..7f580b6f135 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -65,6 +65,7 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
*/
public CompactionTsFileReader(String file, CompactionType compactionType)
throws IOException {
super(file);
+ this.tsFileInput = new CompactionTsFileInput(tsFileInput);
this.compactionType = compactionType;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index 4ce1552412d..29c961095c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -83,15 +83,15 @@ public class RepairDataFileScanUtil {
checkNonAlignedDeviceSeries(reader, device);
}
}
- } catch (IOException ioException) {
+ } catch (CompactionLastTimeCheckFailedException
lastTimeCheckFailedException) {
+ this.hasUnsortedData = true;
+ } catch (Exception e) {
// ignored the exception caused by thread interrupt
if (Thread.currentThread().isInterrupted()) {
return;
}
- logger.warn("Meet error when read tsfile {}", tsfile.getAbsolutePath(),
ioException);
+ logger.warn("Meet error when read tsfile {}", tsfile.getAbsolutePath(),
e);
isBrokenFile = true;
- } catch (CompactionLastTimeCheckFailedException
lastTimeCheckFailedException) {
- this.hasUnsortedData = true;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index fccbe81d5c2..707f251ded7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -184,7 +184,11 @@ public class CompactionScheduler {
return trySubmitCount;
}
- private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task)
{
+ private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task)
+ throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
// check file num
long fileNumLimitForCompaction =
SystemInfo.getInstance().getTotalFileLimitForCompaction();
if (task.getProcessedFileNum() > fileNumLimitForCompaction) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
index dd3e3a26309..62546208378 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -64,6 +64,8 @@ public abstract class AbstractCompactionEstimator {
protected abstract long calculatingDataMemoryCost(CompactionTaskInfo
taskInfo) throws IOException;
+ protected abstract TsFileSequenceReader getReader(String filePath) throws
IOException;
+
protected boolean isAllSourceFileExist(List<TsFileResource> resources) {
for (TsFileResource resource : resources) {
if (resource.getStatus() == TsFileResourceStatus.DELETED) {
@@ -95,8 +97,7 @@ public abstract class AbstractCompactionEstimator {
return fileInfo;
}
}
- try (TsFileSequenceReader reader =
- new TsFileSequenceReader(resource.getTsFilePath(), true, false)) {
+ try (TsFileSequenceReader reader = getReader(resource.getTsFilePath())) {
FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader);
fileInfoCache.put(resource, fileInfo);
synchronized (globalFileInfoCacheForFailedCompaction) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
index 2207fd7f51a..5eb19d2a325 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
@@ -19,7 +19,10 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import java.io.IOException;
import java.util.ArrayList;
@@ -31,6 +34,11 @@ import java.util.List;
*/
public abstract class AbstractCrossSpaceEstimator extends
AbstractCompactionEstimator {
+ @Override
+ protected TsFileSequenceReader getReader(String filePath) throws IOException
{
+ return new CompactionTsFileReader(filePath,
CompactionType.CROSS_COMPACTION);
+ }
+
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException {
List<TsFileResource> resources = new ArrayList<>(seqResources.size() +
unseqResources.size());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
index 50cf624a5c1..d9427c64395 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
@@ -19,7 +19,11 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import java.io.IOException;
import java.util.List;
@@ -30,6 +34,15 @@ import java.util.List;
*/
public abstract class AbstractInnerSpaceEstimator extends
AbstractCompactionEstimator {
+ @Override
+ protected TsFileSequenceReader getReader(String filePath) throws IOException
{
+ if (filePath.contains(IoTDBConstant.UNSEQUENCE_FOLDER_NAME)) {
+ return new CompactionTsFileReader(filePath,
CompactionType.INNER_UNSEQ_COMPACTION);
+ } else {
+ return new CompactionTsFileReader(filePath,
CompactionType.INNER_SEQ_COMPACTION);
+ }
+ }
+
public long estimateInnerCompactionMemory(List<TsFileResource> resources)
throws IOException {
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 56efdfbf500..fdc1ea8fbf4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -38,12 +38,12 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -128,8 +128,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
candidate.getUnseqFiles().size());
return executeTaskResourceSelection(candidate);
- } catch (IOException e) {
- if (e instanceof ClosedByInterruptException || Thread.interrupted()) {
+ } catch (Exception e) {
+ if (e instanceof StopReadTsFileByInterruptException ||
Thread.interrupted()) {
Thread.currentThread().interrupt();
return new CrossCompactionTaskResource();
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java
new file mode 100644
index 00000000000..2a2872502c4
--- /dev/null
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.exception;
+
+import java.io.IOException;
+
+public class StopReadTsFileByInterruptException extends IOException {}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 396c0a9eead..c164675978d 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException;
import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -298,6 +299,8 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while reading file metadata of
file {}", file);
throw e;
@@ -523,6 +526,8 @@ public class TsFileSequenceReader implements AutoCloseable {
TimeseriesMetadata timeseriesMetadata;
try {
timeseriesMetadata =
TimeseriesMetadata.deserializeFrom(tsFileInput, true);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e1) {
logger.error(
"Something error happened while deserializing
TimeseriesMetadata of file {}", file);
@@ -728,6 +733,8 @@ public class TsFileSequenceReader implements AutoCloseable {
ByteBuffer nextBuffer = readData(startOffset, endOffset);
MetadataIndexNode deviceLeafNode =
MetadataIndexNode.deserializeFrom(nextBuffer);
getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while getting all devices of file
{}", file);
throw e;
@@ -795,6 +802,8 @@ public class TsFileSequenceReader implements AutoCloseable {
getAllDeviceLeafNodeOffset(
MetadataIndexNode.deserializeFrom(nextBuffer),
leafDeviceNodeOffsets);
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while getting all devices of file
{}", file);
throw e;
@@ -926,6 +935,8 @@ public class TsFileSequenceReader implements AutoCloseable {
metadataIndexNode.getNodeType(),
queue);
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while getting all paths of file
{}", file);
throw e;
@@ -1144,6 +1155,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
}
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while generating MetadataIndex of
file {}", file);
throw e;
@@ -1193,6 +1206,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
needChunkMetadata);
}
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while generating MetadataIndex of
file {}", file);
throw e;
@@ -1314,6 +1329,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
return getMetadataAndEndOffset(
MetadataIndexNode.deserializeFrom(buffer), name, isDeviceLevel,
exactSearch);
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while deserializing MetadataIndex
of file {}", file);
throw e;
@@ -1369,6 +1386,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
public ChunkHeader readChunkHeader(byte chunkType) throws IOException {
try {
return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(),
chunkType);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading chunk header of {}",
t.getMessage(), file);
throw t;
@@ -1383,6 +1402,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
private ChunkHeader readChunkHeader(long position) throws IOException {
try {
return ChunkHeader.deserializeFrom(tsFileInput, position);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading chunk header of {}",
t.getMessage(), file);
throw t;
@@ -1399,6 +1420,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
public ByteBuffer readChunk(long position, int dataSize) throws IOException {
try {
return readData(position, dataSize);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading chunk of {}",
t.getMessage(), file);
throw t;
@@ -1415,6 +1438,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
ChunkHeader header = readChunkHeader(offset);
ByteBuffer buffer = readChunk(offset + header.getSerializedSize(),
header.getDataSize());
return new Chunk(header, buffer);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading chunk of {}",
t.getMessage(), file);
throw t;
@@ -1434,6 +1459,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
readChunk(
metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
header.getDataSize());
return new Chunk(header, buffer, metaData.getDeleteIntervalList(),
metaData.getStatistics());
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading chunk of {}",
t.getMessage(), file);
throw t;
@@ -1500,6 +1527,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
public PageHeader readPageHeader(TSDataType type, boolean hasStatistic)
throws IOException {
try {
return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type,
hasStatistic);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading page header of {}",
t.getMessage(), file);
throw t;
@@ -1618,6 +1647,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
protected ByteBuffer readData(long start, long end) throws IOException {
try {
return readData(start, (int) (end - start));
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Throwable t) {
logger.warn("Exception {} happened while reading data of {}",
t.getMessage(), file);
throw t;
@@ -1947,6 +1978,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
return TsFileCheckStatus.COMPLETE_FILE;
}
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (IOException e) {
logger.error("Error occurred while fast checking TsFile.");
throw e;
@@ -1960,6 +1993,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
long tscheckStatus = TsFileCheckStatus.COMPLETE_FILE;
try {
tscheckStatus = checkChunkAndPagesStatistics(chunkMetadata);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (IOException e) {
logger.error("Error occurred while checking the statistics of chunk
and its pages");
throw e;
@@ -2392,6 +2427,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
collectEachLeafMeasurementNodeOffsetRange(readData(startOffset,
endOffset), queue);
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error(
"Error occurred while collecting offset ranges of measurement nodes
of file {}", file);
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index 168fdd59fb4..872230d7fc1 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -48,7 +48,7 @@ public class LocalTsFileInput implements TsFileInput {
try {
return channel.size();
} catch (IOException e) {
- logger.error("Error happened while getting {} size", filePath);
+ logger.warn("Error happened while getting {} size", filePath);
throw e;
}
}
@@ -58,7 +58,7 @@ public class LocalTsFileInput implements TsFileInput {
try {
return channel.position();
} catch (IOException e) {
- logger.error("Error happened while getting {} current position",
filePath);
+ logger.warn("Error happened while getting {} current position",
filePath);
throw e;
}
}
@@ -69,7 +69,7 @@ public class LocalTsFileInput implements TsFileInput {
channel.position(newPosition);
return this;
} catch (IOException e) {
- logger.error("Error happened while changing {} position to {}",
filePath, newPosition);
+ logger.warn("Error happened while changing {} position to {}", filePath,
newPosition);
throw e;
}
}