This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new a7c7e54cb7f [To dev/1.3] Load: Optimize fallback logic for corrupted
TsFile (#18004)
a7c7e54cb7f is described below
commit a7c7e54cb7f5590b6900535c19e035fba7428e97
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 16:07:48 2026 +0800
[To dev/1.3] Load: Optimize fallback logic for corrupted TsFile (#18004)
* Load: Optimized the downgraded logic for tsFile to insert more data when
tsFile corrupted (#17674)
* down
* Update LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
* Fix query parser deferred tablet failure handling
* Fix aligned load fallback corruption test
* Address load fallback review comments
(cherry picked from commit cb97fe44ca663fd142a0b5a78e4410f18ca724ae)
* Update LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
* Update LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
---
.../query/TsFileInsertionQueryDataContainer.java | 102 +++-
.../scan/TsFileInsertionScanDataContainer.java | 60 ++-
...eeStatementDataTypeConvertExecutionVisitor.java | 16 +-
.../converter/LoadTreeTsFileTabletIterator.java | 560 +++++++++++++++++++++
...atementDataTypeConvertExecutionVisitorTest.java | 396 +++++++++++++++
5 files changed, 1106 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index e1fa58f5a0f..ad1f458c95d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -51,9 +51,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -72,6 +72,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
+ private RuntimeException deferredException;
@TestOnly
public TsFileInsertionQueryDataContainer(
@@ -101,6 +102,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
pipeTaskMeta,
sourceEvent,
null,
+ null,
isWithMod);
}
@@ -116,6 +118,33 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
final Map<IDeviceID, Boolean> deviceIsAlignedMap,
final boolean isWithMod)
throws IOException {
+ this(
+ pipeName,
+ creationTime,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ deviceIsAlignedMap,
+ null,
+ isWithMod);
+ }
+
+ public TsFileInsertionQueryDataContainer(
+ final String pipeName,
+ final long creationTime,
+ final File tsFile,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final EnrichedEvent sourceEvent,
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap,
+ final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
+ final boolean isWithMod)
+ throws IOException {
super(
tsFile,
pipeName,
@@ -145,7 +174,25 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true,
true);
tsFileReader = new TsFileReader(tsFileSequenceReader);
- if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
+ if (Objects.nonNull(deviceMeasurementsMapOverride)) {
+ this.deviceIsAlignedMap =
+ Objects.nonNull(deviceIsAlignedMap)
+ ? new LinkedHashMap<>(deviceIsAlignedMap)
+ : readDeviceIsAlignedMap();
+ memoryRequiredInBytes +=
+ Objects.nonNull(deviceIsAlignedMap)
+ ? 0
+ :
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);
+
+ measurementDataTypeMap =
+
readFilteredFullPathDataTypeMap(deviceMeasurementsMapOverride.keySet());
+ memoryRequiredInBytes +=
+
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+
+ deviceMeasurementsMap = new
LinkedHashMap<>(deviceMeasurementsMapOverride);
+ memoryRequiredInBytes +=
+
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+ } else if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
// These read-only objects can be found in cache.
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
@@ -246,9 +293,31 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
}
}
+ public TsFileInsertionQueryDataContainer(
+ final File tsFile,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
+ final boolean isWithMod)
+ throws IOException {
+ this(
+ null,
+ 0,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ null,
+ null,
+ null,
+ deviceMeasurementsMapOverride,
+ isWithMod);
+ }
+
private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap) {
- final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new
HashMap<>();
+ final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new
LinkedHashMap<>();
for (final Map.Entry<IDeviceID, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
@@ -282,7 +351,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
}
private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
- final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new HashMap<>();
+ final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new
LinkedHashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
while (deviceIsAlignedIterator.hasNext()) {
@@ -313,7 +382,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
*/
private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final
Set<IDeviceID> devices)
throws IOException {
- final Map<String, TSDataType> result = new HashMap<>();
+ final Map<String, TSDataType> result = new LinkedHashMap<>();
for (final IDeviceID device : devices) {
tsFileSequenceReader
@@ -337,7 +406,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
*/
private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
final Set<IDeviceID> devices) throws IOException {
- final Map<IDeviceID, List<String>> result = new HashMap<>();
+ final Map<IDeviceID, List<String>> result = new LinkedHashMap<>();
for (final IDeviceID device : devices) {
tsFileSequenceReader
@@ -364,6 +433,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
@Override
public boolean hasNext() {
+ throwIfDeferredException();
boolean hasNext = false;
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
@@ -416,9 +486,16 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
recordTabletMetrics(tablet);
final boolean isAligned =
deviceIsAlignedMap.getOrDefault(new
PlainDeviceID(tablet.deviceId), false);
+ boolean isLast;
+ try {
+ isLast = !hasNext();
+ } catch (final RuntimeException e) {
+ deferredException = e;
+ isLast = false;
+ }
final TabletInsertionEvent next;
- if (!hasNext()) {
+ if (isLast) {
next =
new PipeRawTabletInsertionEvent(
tablet,
@@ -448,8 +525,19 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
return tabletInsertionIterable;
}
+ private void throwIfDeferredException() {
+ if (Objects.isNull(deferredException)) {
+ return;
+ }
+
+ final RuntimeException exception = deferredException;
+ deferredException = null;
+ throw exception;
+ }
+
@Override
public void close() {
+ deferredException = null;
try {
if (tsFileReader != null) {
tsFileReader.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index ac1fdb94db7..6c3d341f774 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -45,6 +45,7 @@ import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -96,6 +97,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private boolean currentIsAligned;
private final List<MeasurementSchema> currentMeasurements = new
ArrayList<>();
private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
+ private Exception deferredException;
private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
@@ -187,6 +189,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
@Override
public boolean hasNext() {
+ throwIfDeferredException();
final boolean hasNext = Objects.nonNull(chunkReader);
if (hasNext && !parseStartTimeRecorded) {
// Record start time on first hasNext() that returns true
@@ -205,7 +208,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
throw new NoSuchElementException();
}
- // currentIsAligned is initialized when
TsFileInsertionEventScanParser is
+ // currentIsAligned is initialized when
TsFileInsertionScanDataContainer is
// constructed.
// When the getNextTablet function is called,
currentIsAligned may be updated,
// causing
@@ -215,7 +218,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final Tablet tablet = getNextTablet();
// Record tablet metrics
recordTabletMetrics(tablet);
- final boolean hasNext = hasNext();
+ final boolean isLast =
isLastTabletWithoutDeferredException();
try {
return new PipeRawTabletInsertionEvent(
tablet,
@@ -224,9 +227,10 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
sourceEvent != null ? sourceEvent.getCreationTime() :
0,
pipeTaskMeta,
sourceEvent,
- !hasNext);
+ isLast);
} finally {
- if (!hasNext) {
+ if (isLast) {
+ recordParseEndTimeIfNecessary();
close();
}
}
@@ -241,6 +245,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
new Iterator<Pair<Tablet, Boolean>>() {
@Override
public boolean hasNext() {
+ throwIfDeferredException();
return Objects.nonNull(chunkReader);
}
@@ -251,17 +256,16 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
throw new NoSuchElementException();
}
- // currentIsAligned is initialized when
TsFileInsertionEventScanParser is constructed.
+ // currentIsAligned is initialized when
TsFileInsertionScanDataContainer is constructed.
// When the getNextTablet function is called, currentIsAligned may
be updated, causing
// the currentIsAligned information to be inconsistent with the
current Tablet
// information.
final boolean isAligned = currentIsAligned;
final Tablet tablet = getNextTablet();
- final boolean hasNext = hasNext();
try {
return new Pair<>(tablet, isAligned);
} finally {
- if (!hasNext) {
+ if (isLastTabletWithoutDeferredException()) {
close();
}
}
@@ -269,6 +273,22 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
};
}
+ public IDeviceID getCurrentDevice() {
+ return Objects.nonNull(currentDevice) ? new PlainDeviceID(currentDevice) :
null;
+ }
+
+ public boolean isCurrentAligned() {
+ return currentIsAligned;
+ }
+
+ public List<String> getCurrentMeasurements() {
+ final List<String> measurementIds = new
ArrayList<>(currentMeasurements.size());
+ for (final MeasurementSchema schema : currentMeasurements) {
+ measurementIds.add(schema.getMeasurementId());
+ }
+ return measurementIds;
+ }
+
private Tablet getNextTablet() {
try {
Tablet tablet = null;
@@ -321,7 +341,11 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
// Switch chunk reader iff current chunk is all consumed
if (!data.hasCurrent()) {
- prepareData();
+ try {
+ prepareData();
+ } catch (final Exception e) {
+ deferredException = e;
+ }
}
PipeTabletUtils.compactBitMaps(tablet);
return tablet;
@@ -331,6 +355,26 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
}
+ private void throwIfDeferredException() {
+ if (Objects.isNull(deferredException)) {
+ return;
+ }
+
+ final Exception exception = deferredException;
+ deferredException = null;
+ throw new PipeException("Failed to prepare next tablet insertion event.",
exception);
+ }
+
+ private boolean isLastTabletWithoutDeferredException() {
+ return Objects.isNull(deferredException) && Objects.isNull(chunkReader);
+ }
+
+ private void recordParseEndTimeIfNecessary() {
+ if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+ recordParseEndTime();
+ }
+ }
+
private void prepareData() throws IOException {
do {
do {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index e6f8ffb73f6..a1da7095246 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -20,9 +20,7 @@
package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
@@ -90,17 +88,9 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
try {
for (final File file : loadTsFileStatement.getTsFiles()) {
- try (final TsFileInsertionScanDataContainer container =
- new TsFileInsertionScanDataContainer(
- file,
- new IoTDBPipePattern(null),
- Long.MIN_VALUE,
- Long.MAX_VALUE,
- null,
- null,
- true)) {
- for (final Pair<Tablet, Boolean> tabletWithIsAligned :
- container.toTabletWithIsAligneds()) {
+ try (final LoadTreeTsFileTabletIterator tabletIterator =
+ new LoadTreeTsFileTabletIterator(file, true)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
tabletIterator) {
final PipeTransferTabletRawReq tabletRawReq =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java
new file mode 100644
index 00000000000..70ea903baf5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Load uses scan parsing first for throughput. If scan parsing hits
corruption, fall back to query
+ * parsing for the remaining measurements and devices so later data can still
be loaded.
+ */
+class LoadTreeTsFileTabletIterator
+ implements Iterable<Pair<Tablet, Boolean>>, Iterator<Pair<Tablet,
Boolean>>, AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTreeTsFileTabletIterator.class);
+
+ private static final PipePattern LOAD_TREE_PATTERN = new
IoTDBPipePattern(null);
+
+ private final File file;
+ private final boolean isWithMod;
+ private final ArrayDeque<QueryTask> pendingQueryTasks = new ArrayDeque<>();
+
+ private TsFileInsertionScanDataContainer scanParser;
+ private QueryTask activeQueryTask;
+ private TsFileInsertionQueryDataContainer activeQueryParser;
+ private Iterator<Pair<Tablet, Boolean>> activeIterator;
+ private boolean scanInitialized;
+ private boolean fallbackTriggered;
+
+ private IDeviceID lastEmittedDevice;
+ private List<String> lastEmittedMeasurements = Collections.emptyList();
+ private long lastEmittedTimestamp = Long.MIN_VALUE;
+
+ private IDeviceID lastScanTabletDevice;
+ private List<String> lastScanTabletMeasurements = Collections.emptyList();
+ private final Map<IDeviceID, Set<String>> fullyEmittedMeasurementsByDevice =
+ new LinkedHashMap<>();
+
+ LoadTreeTsFileTabletIterator(final File file, final boolean isWithMod) {
+ this.file = file;
+ this.isWithMod = isWithMod;
+ }
+
+ @Override
+ public Iterator<Pair<Tablet, Boolean>> iterator() {
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ try {
+ ensureActiveIterator();
+ if (Objects.isNull(activeIterator)) {
+ close();
+ return false;
+ }
+
+ if (activeIterator.hasNext()) {
+ return true;
+ }
+
+ if (!switchToNextIterator()) {
+ close();
+ return false;
+ }
+ } catch (final Exception e) {
+ if (recoverFromIteratorFailure(e)) {
+ continue;
+ }
+ close();
+ throw toRuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Pair<Tablet, Boolean> next() {
+ while (true) {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ try {
+ final Pair<Tablet, Boolean> next = activeIterator.next();
+ recordProgress(next);
+ return next;
+ } catch (final Exception e) {
+ if (recoverFromIteratorFailure(e)) {
+ continue;
+ }
+ close();
+ throw toRuntimeException(e);
+ }
+ }
+ }
+
+ private void ensureActiveIterator() throws Exception {
+ if (Objects.nonNull(activeIterator)) {
+ return;
+ }
+
+ if (!scanInitialized && !fallbackTriggered) {
+ scanInitialized = true;
+ try {
+ scanParser =
+ new TsFileInsertionScanDataContainer(
+ file, LOAD_TREE_PATTERN, Long.MIN_VALUE, Long.MAX_VALUE, null,
null, isWithMod);
+ activeIterator = scanParser.toTabletWithIsAligneds().iterator();
+ return;
+ } catch (final Exception e) {
+ if (!switchFromScanToQuery(e)) {
+ throw toRuntimeException(e);
+ }
+ }
+ }
+
+ activateNextQueryParser();
+ }
+
+ private boolean switchToNextIterator() {
+ if (Objects.nonNull(activeQueryParser)) {
+ closeActiveQueryParser();
+ return activateNextQueryParser();
+ }
+
+ closeScanParser();
+ return activateNextQueryParser();
+ }
+
+ private boolean recoverFromIteratorFailure(final Exception e) {
+ if (shouldRethrow(e)) {
+ return false;
+ }
+
+ if (Objects.nonNull(activeQueryTask)) {
+ LOGGER.warn(
+ "Load: Query fallback failed for device {} measurements {} in TsFile
{}. "
+ + "Split or skip this query task and continue.",
+ activeQueryTask.device,
+ activeQueryTask.measurements,
+ file.getAbsolutePath(),
+ e);
+ splitOrSkipActiveQueryTask();
+ return true;
+ }
+
+ return switchFromScanToQuery(e);
+ }
+
+ private boolean switchFromScanToQuery(final Exception e) {
+ if (fallbackTriggered) {
+ return false;
+ }
+
+ fallbackTriggered = true;
+ final IDeviceID currentDevice =
+ Objects.nonNull(scanParser) ? scanParser.getCurrentDevice() : null;
+ final List<String> currentMeasurements =
+ Objects.nonNull(scanParser) ? scanParser.getCurrentMeasurements() :
Collections.emptyList();
+
+ markLastScanMeasurementsAsCompletedIfNeeded(currentDevice,
currentMeasurements);
+
+ closeScanParser();
+
+ try {
+ pendingQueryTasks.addAll(buildQueryTasks(currentDevice,
currentMeasurements));
+ } catch (final Exception queryInitException) {
+ LOGGER.warn(
+ "Load: Failed to initialize query fallback for TsFile {} after scan
parser failure.",
+ file.getAbsolutePath(),
+ queryInitException);
+ return false;
+ }
+
+ LOGGER.warn(
+ "Load: Scan parser detected a corrupted section in TsFile {} at device
{}. "
+ + "Switch to query parsing for remaining devices.",
+ file.getAbsolutePath(),
+ currentDevice,
+ e);
+ return true;
+ }
+
+ private ArrayDeque<QueryTask> buildQueryTasks(
+ final IDeviceID currentDevice, final List<String> currentMeasurements)
throws IOException {
+ final LinkedHashMap<IDeviceID, List<String>> deviceMeasurementsMap =
+ readDeviceMeasurementsInOrder();
+ if (deviceMeasurementsMap.isEmpty()) {
+ return new ArrayDeque<>();
+ }
+
+ final ArrayDeque<QueryTask> tasks = new ArrayDeque<>();
+ boolean includeCurrentAndFollowingDevices =
+ Objects.isNull(currentDevice) ||
!deviceMeasurementsMap.containsKey(currentDevice);
+
+ for (final Map.Entry<IDeviceID, List<String>> entry :
deviceMeasurementsMap.entrySet()) {
+ final IDeviceID device = entry.getKey();
+ if (!includeCurrentAndFollowingDevices && device.equals(currentDevice)) {
+ includeCurrentAndFollowingDevices = true;
+ }
+ if (!includeCurrentAndFollowingDevices) {
+ continue;
+ }
+
+ if (device.equals(currentDevice)) {
+ addCurrentDeviceQueryTasks(tasks, device, entry.getValue(),
currentMeasurements);
+ } else {
+ addQueryTaskIfNecessary(tasks, device, entry.getValue(),
Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+ }
+
+ return tasks;
+ }
+
+ private LinkedHashMap<IDeviceID, List<String>>
readDeviceMeasurementsInOrder()
+ throws IOException {
+ final LinkedHashMap<IDeviceID, List<String>> deviceMeasurementsMap = new
LinkedHashMap<>();
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(file.getAbsolutePath())) {
+ final Iterator<Pair<IDeviceID, List<TimeseriesMetadata>>>
metadataIterator =
+ reader.iterAllTimeseriesMetadata(false, false);
+ while (metadataIterator.hasNext()) {
+ final Pair<IDeviceID, List<TimeseriesMetadata>> deviceMetadata =
metadataIterator.next();
+ deviceMeasurementsMap.put(
+ deviceMetadata.getLeft(),
+ deviceMetadata.getRight().stream()
+ .map(TimeseriesMetadata::getMeasurementId)
+ .collect(Collectors.toList()));
+ }
+ }
+ return deviceMeasurementsMap;
+ }
+
+ private void addCurrentDeviceQueryTasks(
+ final ArrayDeque<QueryTask> tasks,
+ final IDeviceID device,
+ final List<String> allMeasurements,
+ final List<String> currentMeasurements) {
+ final Set<String> completedMeasurements =
+ fullyEmittedMeasurementsByDevice.getOrDefault(device,
Collections.emptySet());
+ final Set<String> currentMeasurementSet = new
LinkedHashSet<>(currentMeasurements);
+
+ final List<String> currentMeasurementsToResume = new ArrayList<>();
+ final List<String> remainingMeasurements = new ArrayList<>();
+ for (final String measurement : allMeasurements) {
+ if (completedMeasurements.contains(measurement)) {
+ continue;
+ }
+ if (currentMeasurementSet.contains(measurement)) {
+ currentMeasurementsToResume.add(measurement);
+ } else {
+ remainingMeasurements.add(measurement);
+ }
+ }
+
+ addQueryTaskIfNecessary(
+ tasks,
+ device,
+ currentMeasurementsToResume,
+ determineTaskResumeStartTime(device, currentMeasurementsToResume,
Long.MIN_VALUE),
+ Long.MAX_VALUE);
+ addQueryTaskIfNecessary(tasks, device, remainingMeasurements,
Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ private boolean activateNextQueryParser() {
+ closeActiveQueryParser();
+
+ while (!pendingQueryTasks.isEmpty()) {
+ activeQueryTask = pendingQueryTasks.removeFirst();
+ try {
+ activeQueryParser =
+ new TsFileInsertionQueryDataContainer(
+ file,
+ LOAD_TREE_PATTERN,
+ activeQueryTask.startTime,
+ activeQueryTask.endTime,
+ activeQueryTask.toDeviceMeasurementsMap(),
+ isWithMod);
+ final Iterator<TabletInsertionEvent> tabletIterator =
+ activeQueryParser.toTabletInsertionEvents().iterator();
+ activeIterator =
+ new Iterator<Pair<Tablet, Boolean>>() {
+ @Override
+ public boolean hasNext() {
+ return tabletIterator.hasNext();
+ }
+
+ @Override
+ public Pair<Tablet, Boolean> next() {
+ final TabletInsertionEvent event = tabletIterator.next();
+ if (!(event instanceof PipeRawTabletInsertionEvent)) {
+ throw new IllegalStateException(
+ "Expected PipeRawTabletInsertionEvent but got " +
event.getClass().getName());
+ }
+
+ final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) event;
+ return new Pair<>(
+ rawTabletInsertionEvent.convertToTablet(),
rawTabletInsertionEvent.isAligned());
+ }
+ };
+ return true;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Load: Failed to initialize query fallback for device {}
measurements {} in TsFile {}. "
+ + "Split or skip this query task and continue.",
+ activeQueryTask.device,
+ activeQueryTask.measurements,
+ file.getAbsolutePath(),
+ e);
+ splitOrSkipActiveQueryTask();
+ }
+ }
+
+ activeIterator = null;
+ return false;
+ }
+
+ private void recordProgress(final Pair<Tablet, Boolean> tabletWithIsAligned)
{
+ final Tablet tablet = tabletWithIsAligned.getLeft();
+ if (Objects.isNull(tablet) || tablet.rowSize == 0) {
+ return;
+ }
+
+ final IDeviceID device = new PlainDeviceID(tablet.deviceId);
+ final List<String> measurements = extractMeasurementNames(tablet);
+
+ if (Objects.isNull(activeQueryParser)) {
+ recordScanProgress(device, measurements);
+ }
+
+ lastEmittedDevice = device;
+ lastEmittedMeasurements = measurements;
+ lastEmittedTimestamp = tablet.timestamps[tablet.rowSize - 1];
+ }
+
+ private boolean shouldRethrow(final Exception e) {
+ Throwable current = e;
+ while (Objects.nonNull(current)) {
+ if (current instanceof InterruptedException
+ || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
+ private RuntimeException toRuntimeException(final Exception e) {
+ return e instanceof RuntimeException
+ ? (RuntimeException) e
+ : new IllegalStateException("Failed to iterate tablets while loading
TsFile.", e);
+ }
+
+ private void closeScanParser() {
+ activeIterator = null;
+ if (Objects.nonNull(scanParser)) {
+ scanParser.close();
+ scanParser = null;
+ }
+ }
+
+ private void closeActiveQueryParser() {
+ activeIterator = null;
+ activeQueryTask = null;
+ if (Objects.isNull(activeQueryParser)) {
+ return;
+ }
+
+ activeQueryParser.close();
+ activeQueryParser = null;
+ }
+
+ @Override
+ public void close() {
+ activeIterator = null;
+ closeScanParser();
+ closeActiveQueryParser();
+ pendingQueryTasks.clear();
+ }
+
+ private void recordScanProgress(final IDeviceID device, final List<String>
measurements) {
+ if (Objects.nonNull(lastScanTabletDevice)
+ && (!lastScanTabletDevice.equals(device)
+ || !measurementsEqual(lastScanTabletMeasurements, measurements))) {
+ markMeasurementsFullyEmitted(lastScanTabletDevice,
lastScanTabletMeasurements);
+ }
+
+ lastScanTabletDevice = device;
+ lastScanTabletMeasurements = measurements;
+ }
+
+ private void markLastScanMeasurementsAsCompletedIfNeeded(
+ final IDeviceID currentDevice, final List<String> currentMeasurements) {
+ if (Objects.isNull(lastScanTabletDevice) ||
lastScanTabletMeasurements.isEmpty()) {
+ return;
+ }
+
+ if (!lastScanTabletDevice.equals(currentDevice)
+ || !currentMeasurements.isEmpty()
+ && !measurementsEqual(lastScanTabletMeasurements,
currentMeasurements)) {
+ markMeasurementsFullyEmitted(lastScanTabletDevice,
lastScanTabletMeasurements);
+ }
+ }
+
+ private void markMeasurementsFullyEmitted(
+ final IDeviceID device, final List<String> measurements) {
+ if (Objects.isNull(device) || measurements.isEmpty()) {
+ return;
+ }
+
+ fullyEmittedMeasurementsByDevice
+ .computeIfAbsent(device, key -> new LinkedHashSet<>())
+ .addAll(measurements);
+ }
+
+ private long determineTaskResumeStartTime(
+ final IDeviceID device, final List<String> measurements, final long
defaultStartTime) {
+ if (measurements.isEmpty()
+ || !device.equals(lastEmittedDevice)
+ || lastEmittedTimestamp == Long.MIN_VALUE
+ || !measurementsEqual(measurements, lastEmittedMeasurements)) {
+ return defaultStartTime;
+ }
+
+ return lastEmittedTimestamp == Long.MAX_VALUE ? Long.MAX_VALUE :
lastEmittedTimestamp + 1;
+ }
+
+ private void addQueryTaskIfNecessary(
+ final ArrayDeque<QueryTask> tasks,
+ final IDeviceID device,
+ final List<String> measurements,
+ final long startTime,
+ final long endTime) {
+ if (measurements.isEmpty() || startTime == Long.MAX_VALUE) {
+ return;
+ }
+
+ tasks.addLast(new QueryTask(device, measurements, startTime, endTime));
+ }
+
+ private void splitOrSkipActiveQueryTask() {
+ final QueryTask failedTask = activeQueryTask;
+ closeActiveQueryParser();
+ if (Objects.isNull(failedTask)) {
+ return;
+ }
+
+ if (failedTask.measurements.size() <= 1) {
+ return;
+ }
+
+ final long resumeStartTime =
+ determineTaskResumeStartTime(
+ failedTask.device, failedTask.measurements, failedTask.startTime);
+ final List<QueryTask> splitTasks = failedTask.split(resumeStartTime);
+ for (int i = splitTasks.size() - 1; i >= 0; --i) {
+ pendingQueryTasks.addFirst(splitTasks.get(i));
+ }
+ }
+
+ private List<String> extractMeasurementNames(final Tablet tablet) {
+ return tablet.getSchemas().stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList());
+ }
+
+ private boolean measurementsEqual(
+ final List<String> leftMeasurements, final List<String>
rightMeasurements) {
+ return leftMeasurements.size() == rightMeasurements.size()
+ && new LinkedHashSet<>(leftMeasurements).equals(new
LinkedHashSet<>(rightMeasurements));
+ }
+
+ private static class QueryTask {
+ private final IDeviceID device;
+ private final List<String> measurements;
+ private final long startTime;
+ private final long endTime;
+
+ private QueryTask(
+ final IDeviceID device,
+ final List<String> measurements,
+ final long startTime,
+ final long endTime) {
+ this.device = device;
+ this.measurements = new ArrayList<>(measurements);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ private LinkedHashMap<IDeviceID, List<String>> toDeviceMeasurementsMap() {
+ final LinkedHashMap<IDeviceID, List<String>> deviceMeasurementsMap = new
LinkedHashMap<>();
+ deviceMeasurementsMap.put(device, measurements);
+ return deviceMeasurementsMap;
+ }
+
+ private List<QueryTask> split(final long resumeStartTime) {
+ final int middle = measurements.size() / 2;
+ if (middle <= 0) {
+ return Collections.emptyList();
+ }
+
+ final List<QueryTask> splitTasks = new ArrayList<>(2);
+ splitTasks.add(
+ new QueryTask(device, measurements.subList(0, middle),
resumeStartTime, endTime));
+ splitTasks.add(
+ new QueryTask(
+ device, measurements.subList(middle, measurements.size()),
resumeStartTime, endTime));
+ return splitTasks;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
new file mode 100644
index 00000000000..150c950fce3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class LoadTreeStatementDataTypeConvertExecutionVisitorTest {
+
+ private static final String DEVICE_0 = "root.sg.d0";
+ private static final String DEVICE_1 = "root.sg.d1";
+ private static final String DEVICE_2 = "root.sg.d2";
+ private static final String ALIGNED_DEVICE = "root.sg.ad0";
+ private static final int ROW_COUNT_PER_DEVICE = 2048;
+ private File tsFile;
+ private boolean isPipeMemoryManagementEnabled;
+ private long pipeMaxReaderChunkSize;
+
+ @Before
+ public void setUp() {
+ isPipeMemoryManagementEnabled =
PipeConfigAccessor.getPipeMemoryManagementEnabled();
+ PipeConfigAccessor.setPipeMemoryManagementEnabled(false);
+ pipeMaxReaderChunkSize =
CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ }
+
+ @After
+ public void tearDown() {
+
PipeConfigAccessor.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
+
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize);
+ if (tsFile != null && tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+ }
+
+ @Test
+ public void
testFallbackToQueryForRemainingDevicesWhenScanParserHitsCorruption()
+ throws Exception {
+ tsFile = new File("load-tree-query-fallback-corrupted.tsfile");
+ writeTsFile(tsFile);
+ corruptMeasurementChunk(tsFile, DEVICE_1, "s0");
+
+ Assert.assertTrue("Expected scan parser to fail after corruption.",
scanParserFails(tsFile));
+
+ final Map<String, Integer> pointCountByDevice = new HashMap<>();
+ final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+ new LoadTreeStatementDataTypeConvertExecutionVisitor(
+ statement -> {
+ collectLoadedPoints(statement, pointCountByDevice);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ });
+
+ final Optional<TSStatus> status =
+
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
null);
+
+ Assert.assertTrue(status.isPresent());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.get().getCode());
+ final int loadedPointCountBeforeCorruption =
pointCountByDevice.getOrDefault(DEVICE_0, 0);
+ final int loadedPointCountAfterFallback =
pointCountByDevice.getOrDefault(DEVICE_2, 0);
+ Assert.assertTrue(loadedPointCountBeforeCorruption > 0);
+ Assert.assertEquals(loadedPointCountBeforeCorruption,
loadedPointCountAfterFallback);
+ }
+
+ @Test
+ public void testFallbackToQueryWhenFirstNonAlignedDeviceIsCorrupted() throws
Exception {
+ tsFile = new
File("load-tree-query-fallback-corrupted-first-non-aligned-device.tsfile");
+ writeTsFile(tsFile);
+ corruptMeasurementChunk(tsFile, DEVICE_0, "s0");
+
+ Assert.assertTrue("Expected scan parser to fail after corruption.",
scanParserFails(tsFile));
+
+ final Map<String, Integer> pointCountByTimeseries = new HashMap<>();
+ final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+ new LoadTreeStatementDataTypeConvertExecutionVisitor(
+ statement -> {
+ collectLoadedPointsByTimeseries(statement,
pointCountByTimeseries);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ });
+
+ final Optional<TSStatus> status =
+
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
null);
+
+ Assert.assertTrue(status.isPresent());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.get().getCode());
+
+ Assert.assertTrue(
+ pointCountByTimeseries.getOrDefault(DEVICE_0 + ".s0", 0) <
ROW_COUNT_PER_DEVICE);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_0, 1);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 1);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1);
+ }
+
+ @Test
+ public void
testFallbackDoesNotReloadCompletedMeasurementsOfCurrentNonAlignedDevice()
+ throws Exception {
+ tsFile = new
File("load-tree-query-fallback-corrupted-current-non-aligned-device.tsfile");
+ writeTsFile(tsFile);
+ corruptMeasurementChunk(tsFile, DEVICE_1, "s1");
+
+ Assert.assertTrue("Expected scan parser to fail after corruption.",
scanParserFails(tsFile));
+
+ final Map<String, Integer> pointCountByTimeseries = new HashMap<>();
+ final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+ new LoadTreeStatementDataTypeConvertExecutionVisitor(
+ statement -> {
+ collectLoadedPointsByTimeseries(statement,
pointCountByTimeseries);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ });
+
+ final Optional<TSStatus> status =
+
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
null);
+
+ Assert.assertTrue(status.isPresent());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.get().getCode());
+
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0);
+ Assert.assertTrue(
+ pointCountByTimeseries.getOrDefault(DEVICE_1 + ".s1", 0) <
ROW_COUNT_PER_DEVICE);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0);
+ assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1);
+ }
+
+ @Test
+ public void
testFallbackToQueryForRemainingMeasurementsOfCurrentAlignedDevice() throws
Exception {
+ CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+
+ tsFile = new
File("load-tree-query-fallback-corrupted-aligned-current-device.tsfile");
+ writeWideAlignedTsFile(tsFile, ALIGNED_DEVICE, 16);
+ corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s8");
+ corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s12");
+
+ Assert.assertTrue("Expected scan parser to fail after corruption.",
scanParserFails(tsFile));
+
+ final Map<String, Integer> pointCountByTimeseries = new HashMap<>();
+ final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+ new LoadTreeStatementDataTypeConvertExecutionVisitor(
+ statement -> {
+ collectLoadedPointsByTimeseries(statement,
pointCountByTimeseries);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ });
+
+ final Optional<TSStatus> status =
+
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
null);
+
+ Assert.assertTrue(status.isPresent());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.get().getCode());
+
+ for (int measurementIndex = 0; measurementIndex < 8; ++measurementIndex) {
+ assertMeasurementLoadedCompletely(pointCountByTimeseries,
ALIGNED_DEVICE, measurementIndex);
+ }
+ for (int measurementIndex : Arrays.asList(9, 10, 11, 13, 14, 15)) {
+ assertMeasurementLoadedCompletely(pointCountByTimeseries,
ALIGNED_DEVICE, measurementIndex);
+ }
+
+ Assert.assertTrue(
+ pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s8", 0) <
ROW_COUNT_PER_DEVICE);
+ Assert.assertTrue(
+ pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) <
ROW_COUNT_PER_DEVICE);
+ }
+
+ private void writeTsFile(final File file) throws Exception {
+ if (file.exists()) {
+ Assert.assertTrue(file.delete());
+ }
+
+ final List<MeasurementSchema> schemaList =
+ Arrays.asList(
+ new MeasurementSchema("s0", TSDataType.INT64, TSEncoding.PLAIN),
+ new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN));
+
+ try (final TsFileWriter writer = new TsFileWriter(file)) {
+ writeDevice(writer, schemaList, DEVICE_0, 0);
+ writeDevice(writer, schemaList, DEVICE_1, 10_000);
+ writeDevice(writer, schemaList, DEVICE_2, 20_000);
+ }
+ }
+
+ private void writeWideAlignedTsFile(
+ final File file, final String device, final int measurementCount) throws
Exception {
+ if (file.exists()) {
+ Assert.assertTrue(file.delete());
+ }
+
+ final List<MeasurementSchema> schemaList = new java.util.ArrayList<>();
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ schemaList.add(
+ new MeasurementSchema("s" + measurementIndex, TSDataType.INT64,
TSEncoding.PLAIN));
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(file)) {
+ writer.registerAlignedTimeseries(new Path(device), schemaList);
+
+ final Tablet tablet = new Tablet(device, schemaList,
ROW_COUNT_PER_DEVICE);
+ for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) {
+ tablet.addTimestamp(row, row);
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ tablet.addValue("s" + measurementIndex, row, (long) measurementIndex
* 10_000 + row);
+ }
+ }
+ tablet.rowSize = ROW_COUNT_PER_DEVICE;
+ writer.writeAligned(tablet);
+ }
+ }
+
+ private void writeDevice(
+ final TsFileWriter writer,
+ final List<MeasurementSchema> schemaList,
+ final String device,
+ final long valueBase)
+ throws Exception {
+ writer.registerTimeseries(new Path(device), schemaList);
+
+ final Tablet tablet = new Tablet(device, schemaList, ROW_COUNT_PER_DEVICE);
+ for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) {
+ tablet.addTimestamp(row, row);
+ tablet.addValue("s0", row, valueBase + row);
+ tablet.addValue("s1", row, valueBase + ROW_COUNT_PER_DEVICE + row);
+ }
+ tablet.rowSize = ROW_COUNT_PER_DEVICE;
+ writer.write(tablet);
+ }
+
+ private void corruptMeasurementChunk(
+ final File file, final String device, final String measurement) throws
Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(file.getAbsolutePath())) {
+ final IDeviceID deviceId = new PlainDeviceID(device);
+ final List<IChunkMetadata> chunkMetadataList =
+ reader.getIChunkMetadataList(new Path(deviceId, measurement, false));
+ Assert.assertFalse(chunkMetadataList.isEmpty());
+
+ final long chunkHeaderOffset =
+ getTargetChunkMetadata(chunkMetadataList.get(0),
measurement).getOffsetOfChunkHeader();
+ try (final RandomAccessFile randomAccessFile = new
RandomAccessFile(file, "rw")) {
+ randomAccessFile.seek(chunkHeaderOffset + 64);
+ randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+ }
+ }
+ }
+
+ private IChunkMetadata getTargetChunkMetadata(
+ final IChunkMetadata chunkMetadata, final String measurement) {
+ if (!(chunkMetadata instanceof AlignedChunkMetadata)) {
+ return chunkMetadata;
+ }
+
+ final IChunkMetadata valueChunkMetadata =
+ ((AlignedChunkMetadata) chunkMetadata)
+ .getValueChunkMetadataList().stream()
+ .filter(Objects::nonNull)
+ .filter(metadata ->
measurement.equals(metadata.getMeasurementUid()))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(valueChunkMetadata);
+ return valueChunkMetadata;
+ }
+
+ private void assertMeasurementLoadedCompletely(
+ final Map<String, Integer> pointCountByTimeseries,
+ final String device,
+ final int measurementIndex) {
+ Assert.assertEquals(
+ ROW_COUNT_PER_DEVICE,
+ pointCountByTimeseries.getOrDefault(device + ".s" + measurementIndex,
0).intValue());
+ }
+
+ private boolean scanParserFails(final File file) throws Exception {
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE,
null, null, true)) {
+ parser.toTabletWithIsAligneds().forEach(tabletWithIsAligned -> {});
+ return false;
+ } catch (final Exception e) {
+ return true;
+ }
+ }
+
+ private void collectLoadedPointsByTimeseries(
+ final Statement statement, final Map<String, Integer>
pointCountByTimeseries) {
+ Assert.assertTrue(statement instanceof InsertMultiTabletsStatement);
+ for (final InsertTabletStatement insertTabletStatement :
+ ((InsertMultiTabletsStatement)
statement).getInsertTabletStatementList()) {
+ for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) {
+ for (int column = 0; column <
insertTabletStatement.getMeasurements().length; ++column) {
+ final String measurement =
insertTabletStatement.getMeasurements()[column];
+ if (measurement == null || isNull(insertTabletStatement, row,
column)) {
+ continue;
+ }
+ pointCountByTimeseries.merge(
+ insertTabletStatement.getDevicePath().getFullPath() + "." +
measurement,
+ 1,
+ Integer::sum);
+ }
+ }
+ }
+ }
+
+ private void collectLoadedPoints(
+ final Statement statement, final Map<String, Integer>
pointCountByDevice) {
+ Assert.assertTrue(statement instanceof InsertMultiTabletsStatement);
+ for (final InsertTabletStatement insertTabletStatement :
+ ((InsertMultiTabletsStatement)
statement).getInsertTabletStatementList()) {
+ pointCountByDevice.merge(
+ insertTabletStatement.getDevicePath().getFullPath(),
+ countNonNullPoints(insertTabletStatement),
+ Integer::sum);
+ }
+ }
+
+ private int countNonNullPoints(final InsertTabletStatement
insertTabletStatement) {
+ int pointCount = 0;
+ for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) {
+ for (int column = 0; column <
insertTabletStatement.getMeasurements().length; ++column) {
+ if (insertTabletStatement.getMeasurements()[column] != null
+ && !isNull(insertTabletStatement, row, column)) {
+ ++pointCount;
+ }
+ }
+ }
+ return pointCount;
+ }
+
+ private boolean isNull(
+ final InsertTabletStatement insertTabletStatement, final int row, final
int column) {
+ final Object[] columns = insertTabletStatement.getColumns();
+ if (columns == null || column >= columns.length || columns[column] ==
null) {
+ return true;
+ }
+
+ final BitMap[] bitMaps = insertTabletStatement.getBitMaps();
+ return bitMaps != null
+ && column < bitMaps.length
+ && bitMaps[column] != null
+ && bitMaps[column].isMarked(row);
+ }
+
+ private static class PipeConfigAccessor {
+ private static boolean getPipeMemoryManagementEnabled() {
+ return
CommonDescriptor.getInstance().getConfig().getPipeMemoryManagementEnabled();
+ }
+
+ private static void setPipeMemoryManagementEnabled(final boolean enabled) {
+
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(enabled);
+ }
+ }
+}