This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1ebf46e93 Pipe: Intoduce TsFileInsertionScanDataContainer to read
data from tsfile sequentially to improve pattern parse performance when filter
rate is high (#12781)
2e1ebf46e93 is described below
commit 2e1ebf46e930f49c47fa1ff54da902d97cfc65b1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 18:57:27 2024 +0800
Pipe: Intoduce TsFileInsertionScanDataContainer to read data from tsfile
sequentially to improve pattern parse performance when filter rate is high
(#12781)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 12 +-
.../container/TsFileInsertionDataContainer.java | 78 +++++
.../TsFileInsertionDataContainerProvider.java | 123 +++++++
.../query/TsFileInsertionQueryDataContainer.java} | 107 +++---
.../TsFileInsertionQueryDataTabletIterator.java} | 22 +-
.../scan/AlignedSinglePageWholeChunkReader.java | 170 +++++++++
.../container/scan/SinglePageWholeChunkReader.java | 116 ++++++
.../scan/TsFileInsertionScanDataContainer.java | 388 +++++++++++++++++++++
.../PipeHistoricalDataRegionTsFileExtractor.java | 3 +-
.../pipe/resource/tsfile/PipeTsFileResource.java | 87 ++++-
.../resource/tsfile/PipeTsFileResourceManager.java | 6 +-
.../event/TsFileInsertionDataContainerTest.java | 232 +++++++-----
.../apache/iotdb/commons/conf/CommonConfig.java | 9 +
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +
.../commons/pipe/pattern/IoTDBPipePattern.java | 4 +
16 files changed, 1189 insertions(+), 178 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 05a5a6ff0f9..9d4a013b069 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -179,7 +181,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
// If the previous "isWithMod" is false, the modFile has been set to "null",
then the isWithMod
// can't be set to true
- public void disableMod4NonTransferPipes(boolean isWithMod) {
+ public void disableMod4NonTransferPipes(final boolean isWithMod) {
this.isWithMod = isWithMod && this.isWithMod;
}
@@ -293,7 +295,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
PipeDataNodeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
-
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()),
+ false);
final Set<IDeviceID> deviceSet =
Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() :
resource.getDevices();
return deviceSet.stream()
@@ -338,8 +341,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
try {
if (dataContainer == null) {
dataContainer =
- new TsFileInsertionDataContainer(
- tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
+ new TsFileInsertionDataContainerProvider(
+ tsFile, pipePattern, startTime, endTime, pipeTaskMeta,
this)
+ .provide();
}
return dataContainer;
} catch (final IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
new file mode 100644
index 00000000000..2e8a7ec6efa
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.tsfile.read.filter.factory.TimeFilterApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class TsFileInsertionDataContainer implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
+
+ protected final PipePattern pattern; // used to filter data
+ protected final GlobalTimeExpression timeFilterExpression; // used to filter
data
+
+ protected final PipeTaskMeta pipeTaskMeta; // used to report progress
+ protected final EnrichedEvent sourceEvent; // used to report progress
+
+ protected TsFileSequenceReader tsFileSequenceReader;
+
+ protected TsFileInsertionDataContainer(
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final EnrichedEvent sourceEvent) {
+ this.pattern = pattern;
+ timeFilterExpression =
+ (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
+ ? null
+ : new GlobalTimeExpression(TimeFilterApi.between(startTime,
endTime));
+
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.sourceEvent = sourceEvent;
+ }
+
+ /**
+ * @return {@link TabletInsertionEvent} in a streaming way
+ */
+ public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
+
+ @Override
+ public void close() {
+ try {
+ if (tsFileSequenceReader != null) {
+ tsFileSequenceReader.close();
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to close TsFileSequenceReader", e);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
new file mode 100644
index 00000000000..9ce61d70eab
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class TsFileInsertionDataContainerProvider {
+
+ private final File tsFile;
+ private final PipePattern pattern;
+ private final long startTime;
+ private final long endTime;
+
+ protected final PipeTaskMeta pipeTaskMeta;
+ protected final PipeTsFileInsertionEvent sourceEvent;
+
+ public TsFileInsertionDataContainerProvider(
+ final File tsFile,
+ final PipePattern pipePattern,
+ final long startTime,
+ final long endTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipeTsFileInsertionEvent sourceEvent) {
+ this.tsFile = tsFile;
+ this.pattern = pipePattern;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.sourceEvent = sourceEvent;
+ }
+
+ public TsFileInsertionDataContainer provide() throws IOException {
+ if (startTime != Long.MIN_VALUE
+ || endTime != Long.MAX_VALUE
+ || pattern instanceof IoTDBPipePattern
+ && !((IoTDBPipePattern)
pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
+ // 1. If time filter exists, use query here because the scan container
may filter it
+ // row by row in single page chunk.
+ // 2. If the pattern matches only one time series in one device, use
query container here
+ // because there is no timestamps merge overhead.
+ //
+ // Note: We judge prefix pattern as matching multiple timeseries in one
device because it's
+ // hard to know whether it only matches one timeseries, while matching
multiple is often the
+ // case.
+ return new TsFileInsertionQueryDataContainer(
+ tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ }
+
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+
PipeDataNodeResourceManager.tsfile().getDeviceIsAlignedMapFromCache(tsFile,
false);
+ if (Objects.isNull(deviceIsAlignedMap)) {
+ // If we failed to get from cache, it indicates that the memory usage is
high.
+ // We use scan data container because it requires less memory.
+ return new TsFileInsertionScanDataContainer(
+ tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ }
+
+ final int originalSize = deviceIsAlignedMap.size();
+ final Map<IDeviceID, Boolean> filteredDeviceIsAlignedMap =
+ filterDeviceIsAlignedMapByPattern(deviceIsAlignedMap);
+ // Use scan data container if we need enough amount to data thus it's
better to scan than query.
+ return (double) filteredDeviceIsAlignedMap.size() / originalSize
+ > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
+ ? new TsFileInsertionScanDataContainer(
+ tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent)
+ : new TsFileInsertionQueryDataContainer(
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ filteredDeviceIsAlignedMap);
+ }
+
+ private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap) {
+ if (Objects.isNull(pattern) || pattern.isRoot()) {
+ return deviceIsAlignedMap;
+ }
+
+ return deviceIsAlignedMap.entrySet().stream()
+ .filter(
+ entry -> {
+ final String deviceId = ((PlainDeviceID)
entry.getKey()).toStringID();
+ return pattern.coversDevice(deviceId) ||
pattern.mayOverlapWithDevice(deviceId);
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
similarity index 80%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 4e324bb8afe..2a0c7c410f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.event.common.tsfile;
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -38,10 +39,6 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.expression.IExpression;
-import org.apache.tsfile.read.expression.impl.BinaryExpression;
-import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
-import org.apache.tsfile.read.filter.factory.TimeFilterApi;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
@@ -59,35 +56,26 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
-public class TsFileInsertionDataContainer implements AutoCloseable {
+public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContainer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
-
- private final PipePattern pattern; // used to filter data
- private final IExpression timeFilterExpression; // used to filter data
-
- private final PipeTaskMeta pipeTaskMeta; // used to report progress
- private final EnrichedEvent sourceEvent; // used to report progress
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TsFileInsertionQueryDataContainer.class);
private final PipeMemoryBlock allocatedMemoryBlock;
-
- private final TsFileSequenceReader tsFileSequenceReader;
private final TsFileReader tsFileReader;
private final Iterator<Map.Entry<IDeviceID, List<String>>>
deviceMeasurementsMapIterator;
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
- private boolean shouldParsePattern = false;
-
@TestOnly
- public TsFileInsertionDataContainer(
+ public TsFileInsertionQueryDataContainer(
final File tsFile, final PipePattern pattern, final long startTime,
final long endTime)
throws IOException {
this(tsFile, pattern, startTime, endTime, null, null);
}
- public TsFileInsertionDataContainer(
+ public TsFileInsertionQueryDataContainer(
final File tsFile,
final PipePattern pattern,
final long startTime,
@@ -95,16 +83,19 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent)
throws IOException {
- this.pattern = pattern;
- timeFilterExpression =
- (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
- ? null
- : BinaryExpression.and(
- new GlobalTimeExpression(TimeFilterApi.gtEq(startTime)),
- new GlobalTimeExpression(TimeFilterApi.ltEq(endTime)));
+ this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
+ }
- this.pipeTaskMeta = pipeTaskMeta;
- this.sourceEvent = sourceEvent;
+ public TsFileInsertionQueryDataContainer(
+ final File tsFile,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final EnrichedEvent sourceEvent,
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap)
+ throws IOException {
+ super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
try {
final PipeTsFileResourceManager tsFileResourceManager =
PipeDataNodeResourceManager.tsfile();
@@ -118,17 +109,27 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
// These read-only objects can be found in cache.
- deviceIsAlignedMap =
tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile);
+ this.deviceIsAlignedMap =
+ Objects.nonNull(deviceIsAlignedMap)
+ ? deviceIsAlignedMap
+ : tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile,
true);
measurementDataTypeMap =
tsFileResourceManager.getMeasurementDataTypeMapFromCache(tsFile);
deviceMeasurementsMap =
tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile);
} else {
// We need to create these objects here and remove them later.
- deviceIsAlignedMap = readDeviceIsAlignedMap();
- memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
-
- // Filter devices that may overlap with pattern first
- // to avoid reading all time-series of all devices.
- final Set<IDeviceID> devices =
filterDevicesByPattern(deviceIsAlignedMap.keySet());
+ final Set<IDeviceID> devices;
+ if (Objects.isNull(deviceIsAlignedMap)) {
+ this.deviceIsAlignedMap = readDeviceIsAlignedMap();
+ memoryRequiredInBytes +=
+
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);
+
+ // Filter devices that may overlap with pattern first
+ // to avoid reading all time-series of all devices.
+ devices = filterDevicesByPattern(this.deviceIsAlignedMap.keySet());
+ } else {
+ this.deviceIsAlignedMap = deviceIsAlignedMap;
+ devices = deviceIsAlignedMap.keySet();
+ }
measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
memoryRequiredInBytes +=
@@ -176,9 +177,6 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
for (final String measurement : entry.getValue()) {
if (pattern.matchesMeasurement(deviceId, measurement)) {
filteredMeasurements.add(measurement);
- } else {
- // Parse pattern iff there are measurements filtered out
- shouldParsePattern = true;
}
}
@@ -186,13 +184,6 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId),
filteredMeasurements);
}
}
-
- // case 3: for example, pattern is root.a.b.c and device is root.a.b.d
- // in this case, no data can be matched
- else {
- // Parse pattern iff there are measurements filtered out
- shouldParsePattern = true;
- }
}
return filteredDeviceMeasurementsMap;
@@ -232,7 +223,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
throws IOException {
final Map<String, TSDataType> result = new HashMap<>();
- for (IDeviceID device : devices) {
+ for (final IDeviceID device : devices) {
tsFileSequenceReader
.readDeviceMetadata(device)
.values()
@@ -256,7 +247,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
final Set<IDeviceID> devices) throws IOException {
final Map<IDeviceID, List<String>> result = new HashMap<>();
- for (IDeviceID device : devices) {
+ for (final IDeviceID device : devices) {
tsFileSequenceReader
.readDeviceMetadata(device)
.values()
@@ -270,14 +261,12 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
return result;
}
- /**
- * @return {@link TabletInsertionEvent} in a streaming way
- */
+ @Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return () ->
new Iterator<TabletInsertionEvent>() {
- private TsFileInsertionDataTabletIterator tabletIterator = null;
+ private TsFileInsertionQueryDataTabletIterator tabletIterator = null;
@Override
public boolean hasNext() {
@@ -291,13 +280,13 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
try {
tabletIterator =
- new TsFileInsertionDataTabletIterator(
+ new TsFileInsertionQueryDataTabletIterator(
tsFileReader,
measurementDataTypeMap,
((PlainDeviceID) entry.getKey()).toStringID(),
entry.getValue(),
timeFilterExpression);
- } catch (final IOException e) {
+ } catch (final Exception e) {
close();
throw new PipeException("failed to create
TsFileInsertionDataTabletIterator", e);
}
@@ -345,10 +334,6 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
};
}
- public boolean shouldParsePattern() {
- return shouldParsePattern;
- }
-
@Override
public void close() {
try {
@@ -359,13 +344,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
LOGGER.warn("Failed to close TsFileReader", e);
}
- try {
- if (tsFileSequenceReader != null) {
- tsFileSequenceReader.close();
- }
- } catch (final IOException e) {
- LOGGER.warn("Failed to close TsFileSequenceReader", e);
- }
+ super.close();
if (allocatedMemoryBlock != null) {
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
similarity index 89%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index 1e028ae870e..efe58f5ff60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.event.common.tsfile;
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -42,7 +42,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
-public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
+public class TsFileInsertionQueryDataTabletIterator implements
Iterator<Tablet> {
private final TsFileReader tsFileReader;
private final Map<String, TSDataType> measurementDataTypeMap;
@@ -54,12 +54,12 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
private final QueryDataSet queryDataSet;
- public TsFileInsertionDataTabletIterator(
- TsFileReader tsFileReader,
- Map<String, TSDataType> measurementDataTypeMap,
- String deviceId,
- List<String> measurements,
- IExpression timeFilterExpression)
+ TsFileInsertionQueryDataTabletIterator(
+ final TsFileReader tsFileReader,
+ final Map<String, TSDataType> measurementDataTypeMap,
+ final String deviceId,
+ final List<String> measurements,
+ final IExpression timeFilterExpression)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
@@ -81,7 +81,7 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
private QueryDataSet buildQueryDataSet() throws IOException {
final List<Path> paths = new ArrayList<>();
- for (String measurement : measurements) {
+ for (final String measurement : measurements) {
paths.add(new Path(deviceId, measurement, false));
}
return tsFileReader.query(QueryExpression.create(paths,
timeFilterExpression));
@@ -91,7 +91,7 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
public boolean hasNext() {
try {
return queryDataSet.hasNext();
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new PipeException("Failed to check next", e);
}
}
@@ -104,7 +104,7 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
try {
return buildNextTablet();
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new PipeException("Failed to build tablet", e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
new file mode 100644
index 00000000000..740a1523d27
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.page.AlignedPageReader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link AlignedSinglePageWholeChunkReader} is used to read a whole
single page aligned chunk
+ * with need to pass in the statistics.
+ */
+public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
+
+ // chunk header of the time column
+ private final ChunkHeader timeChunkHeader;
+ // chunk data of the time column
+ private final ByteBuffer timeChunkDataBuffer;
+
+ // chunk headers of all the sub sensors
+ private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>();
+ // chunk data of all the sub sensors
+ private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
+ // deleted intervals of all the sub sensors
+ private final List<List<TimeRange>> valueDeleteIntervalsList = new
ArrayList<>();
+
+ public AlignedSinglePageWholeChunkReader(Chunk timeChunk, List<Chunk>
valueChunkList)
+ throws IOException {
+ super(Long.MIN_VALUE, null);
+ this.timeChunkHeader = timeChunk.getHeader();
+ this.timeChunkDataBuffer = timeChunk.getData();
+
+ valueChunkList.forEach(
+ chunk -> {
+ this.valueChunkHeaderList.add(chunk == null ? null :
chunk.getHeader());
+ this.valueChunkDataBufferList.add(chunk == null ? null :
chunk.getData());
+ this.valueDeleteIntervalsList.add(chunk == null ? null :
chunk.getDeleteIntervalList());
+ });
+
+ initAllPageReaders();
+ }
+
+ private void initAllPageReaders() throws IOException {
+ while (timeChunkDataBuffer.remaining() > 0) {
+ AlignedPageReader alignedPageReader = deserializeFromSinglePageChunk();
+ if (alignedPageReader != null) {
+ pageReaderList.add(alignedPageReader);
+ }
+ }
+ }
+
+ private AlignedPageReader deserializeFromSinglePageChunk() throws
IOException {
+ PageHeader timePageHeader =
+ PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends
Serializable>) null);
+ List<PageHeader> valuePageHeaderList = new ArrayList<>();
+
+ boolean isAllNull = true;
+ for (ByteBuffer byteBuffer : valueChunkDataBufferList) {
+ if (byteBuffer != null) {
+ isAllNull = false;
+ valuePageHeaderList.add(
+ PageHeader.deserializeFrom(byteBuffer, (Statistics<? extends
Serializable>) null));
+ } else {
+ valuePageHeaderList.add(null);
+ }
+ }
+
+ if (isAllNull) {
+ // when there is only one page in the chunk, the page statistic is the
same as the chunk, so
+ // we needn't filter the page again
+ skipCurrentPage(timePageHeader, valuePageHeaderList);
+ return null;
+ }
+ return constructAlignedPageReader(timePageHeader, valuePageHeaderList);
+ }
+
+ private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader>
valuePageHeader) {
+ timeChunkDataBuffer.position(
+ timeChunkDataBuffer.position() + timePageHeader.getCompressedSize());
+ for (int i = 0; i < valuePageHeader.size(); i++) {
+ if (valuePageHeader.get(i) != null) {
+ valueChunkDataBufferList
+ .get(i)
+ .position(
+ valueChunkDataBufferList.get(i).position()
+ + valuePageHeader.get(i).getCompressedSize());
+ }
+ }
+ }
+
+ private AlignedPageReader constructAlignedPageReader(
+ PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList)
throws IOException {
+ ByteBuffer timePageData =
+ ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer,
timeChunkHeader);
+
+ List<PageHeader> valuePageHeaderList = new ArrayList<>();
+ List<ByteBuffer> valuePageDataList = new ArrayList<>();
+ List<TSDataType> valueDataTypeList = new ArrayList<>();
+ List<Decoder> valueDecoderList = new ArrayList<>();
+
+ boolean isAllNull = true;
+ for (int i = 0; i < rawValuePageHeaderList.size(); i++) {
+ PageHeader valuePageHeader = rawValuePageHeaderList.get(i);
+
+ if (valuePageHeader == null || valuePageHeader.getUncompressedSize() ==
0) {
+ // Empty Page
+ valuePageHeaderList.add(null);
+ valuePageDataList.add(null);
+ valueDataTypeList.add(null);
+ valueDecoderList.add(null);
+ } else {
+ ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
+ valuePageHeaderList.add(valuePageHeader);
+ valuePageDataList.add(
+ ChunkReader.deserializePageData(
+ valuePageHeader, valueChunkDataBufferList.get(i),
valueChunkHeader));
+ valueDataTypeList.add(valueChunkHeader.getDataType());
+ valueDecoderList.add(
+ Decoder.getDecoderByType(
+ valueChunkHeader.getEncodingType(),
valueChunkHeader.getDataType()));
+ isAllNull = false;
+ }
+ }
+ if (isAllNull) {
+ return null;
+ }
+ AlignedPageReader alignedPageReader =
+ new AlignedPageReader(
+ timePageHeader,
+ timePageData,
+ defaultTimeDecoder,
+ valuePageHeaderList,
+ valuePageDataList,
+ valueDataTypeList,
+ valueDecoderList,
+ queryFilter);
+ alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
+ return alignedPageReader;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
new file mode 100644
index 00000000000..3f1aadbba0c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.tsfile.read.reader.page.PageReader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+public class SinglePageWholeChunkReader extends AbstractChunkReader {
+ private final ChunkHeader chunkHeader;
+ private final ByteBuffer chunkDataBuffer;
+
+ public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
+ super(Long.MIN_VALUE, null);
+
+ this.chunkHeader = chunk.getHeader();
+ this.chunkDataBuffer = chunk.getData();
+
+ initAllPageReaders();
+ }
+
+ private void initAllPageReaders() throws IOException {
+ // construct next satisfied page header
+ while (chunkDataBuffer.remaining() > 0) {
+ pageReaderList.add(
+ constructPageReader(
+ PageHeader.deserializeFrom(
+ chunkDataBuffer, (Statistics<? extends Serializable>)
null)));
+ }
+ }
+
+ private PageReader constructPageReader(PageHeader pageHeader) throws
IOException {
+ return new PageReader(
+ pageHeader,
+ deserializePageData(pageHeader, chunkDataBuffer, chunkHeader),
+ chunkHeader.getDataType(),
+ Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType()),
+ defaultTimeDecoder,
+ null);
+ }
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // util methods
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public static ByteBuffer readCompressedPageData(PageHeader pageHeader,
ByteBuffer chunkBuffer)
+ throws IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] compressedPageBody = new byte[compressedPageBodyLength];
+ // doesn't have a complete page body
+ if (compressedPageBodyLength > chunkBuffer.remaining()) {
+ throw new IOException(
+ "do not has a complete page body. Expected:"
+ + compressedPageBodyLength
+ + ". Actual:"
+ + chunkBuffer.remaining());
+ }
+ chunkBuffer.get(compressedPageBody);
+ return ByteBuffer.wrap(compressedPageBody);
+ }
+
+ public static ByteBuffer uncompressPageData(
+ PageHeader pageHeader, IUnCompressor unCompressor, ByteBuffer
compressedPageData)
+ throws IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()];
+ try {
+ unCompressor.uncompress(
+ compressedPageData.array(), 0, compressedPageBodyLength,
uncompressedPageData, 0);
+ } catch (Exception e) {
+ throw new IOException(
+ "Uncompress error! uncompress size: "
+ + pageHeader.getUncompressedSize()
+ + "compressed size: "
+ + pageHeader.getCompressedSize()
+ + "page header: "
+ + pageHeader
+ + e.getMessage());
+ }
+
+ return ByteBuffer.wrap(uncompressedPageData);
+ }
+
+ public static ByteBuffer deserializePageData(
+ PageHeader pageHeader, ByteBuffer chunkBuffer, ChunkHeader chunkHeader)
throws IOException {
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+ ByteBuffer compressedPageBody = readCompressedPageData(pageHeader,
chunkBuffer);
+ return uncompressPageData(pageHeader, unCompressor, compressedPageBody);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
new file mode 100644
index 00000000000..a841986064e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.MetaMarker;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContainer {
+
+ private final long startTime;
+ private final long endTime;
+ private final Filter filter;
+
+ private IChunkReader chunkReader;
+ private BatchData data;
+
+ private boolean isMultiPage;
+ private String currentDevice;
+ private boolean currentIsAligned;
+ private final List<MeasurementSchema> currentMeasurements = new
ArrayList<>();
+
+ private byte lastMarker = Byte.MIN_VALUE;
+
+ public TsFileInsertionScanDataContainer(
+ final File tsFile,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final EnrichedEvent sourceEvent)
+ throws IOException {
+ super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+
+ this.startTime = startTime;
+ this.endTime = endTime;
+ filter = Objects.nonNull(timeFilterExpression) ?
timeFilterExpression.getFilter() : null;
+
+ try {
+ tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
+ tsFileSequenceReader.position((long)
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+
+ prepareData();
+ } catch (final Exception e) {
+ close();
+ throw e;
+ }
+ }
+
+ @Override
+ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+ return () ->
+ new Iterator<TabletInsertionEvent>() {
+
+ @Override
+ public boolean hasNext() {
+ return Objects.nonNull(chunkReader);
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ final Tablet tablet = getNextTablet();
+ final boolean hasNext = hasNext();
+ try {
+ return new PipeRawTabletInsertionEvent(
+ tablet,
+ currentIsAligned,
+ sourceEvent != null ? sourceEvent.getPipeName() : null,
+ sourceEvent != null ? sourceEvent.getCreationTime() : 0,
+ pipeTaskMeta,
+ sourceEvent,
+ !hasNext);
+ } finally {
+ if (!hasNext) {
+ close();
+ }
+ }
+ }
+ };
+ }
+
+ private Tablet getNextTablet() {
+ try {
+ final Tablet tablet =
+ new Tablet(
+ currentDevice,
+ currentMeasurements,
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ tablet.initBitMaps();
+
+ while (data.hasCurrent()) {
+ if (isMultiPage || data.currentTime() >= startTime &&
data.currentTime() <= endTime) {
+ final int rowIndex = tablet.rowSize;
+
+ tablet.addTimestamp(rowIndex, data.currentTime());
+ putValueToColumns(data, tablet.values, rowIndex);
+
+ tablet.rowSize++;
+ }
+
+ data.next();
+ while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ break;
+ }
+ }
+
+ // Switch chunk reader iff current chunk is all consumed
+ if (!data.hasCurrent()) {
+ prepareData();
+ }
+
+ return tablet;
+ } catch (final Exception e) {
+ close();
+ throw new PipeException("Failed to get next tablet insertion event.", e);
+ }
+ }
+
+ private void prepareData() throws IOException {
+ do {
+ do {
+ moveToNextChunkReader();
+ } while (Objects.nonNull(chunkReader) &&
!chunkReader.hasNextSatisfiedPage());
+
+ if (Objects.isNull(chunkReader)) {
+ close();
+ break;
+ }
+
+ do {
+ data = chunkReader.nextPageData();
+ } while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
+ } while (!data.hasCurrent());
+ }
+
+ private void putValueToColumns(final BatchData data, final Object[] columns,
final int rowIndex) {
+ final TSDataType type = data.getDataType();
+ if (type == TSDataType.VECTOR) {
+ for (int i = 0; i < columns.length; ++i) {
+ final TsPrimitiveType primitiveType = data.getVector()[i];
+ switch (primitiveType.getDataType()) {
+ case BOOLEAN:
+ ((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
+ break;
+ case INT32:
+ ((int[]) columns[i])[rowIndex] = primitiveType.getInt();
+ break;
+ case DATE:
+ ((LocalDate[]) columns[i])[rowIndex] =
+ DateUtils.parseIntToLocalDate(primitiveType.getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ ((long[]) columns[i])[rowIndex] = primitiveType.getLong();
+ break;
+ case FLOAT:
+ ((float[]) columns[i])[rowIndex] = primitiveType.getFloat();
+ break;
+ case DOUBLE:
+ ((double[]) columns[i])[rowIndex] = primitiveType.getDouble();
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ ((Binary[]) columns[i])[rowIndex] = primitiveType.getBinary();
+ break;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
+ }
+ }
+ } else {
+ switch (type) {
+ case BOOLEAN:
+ ((boolean[]) columns[0])[rowIndex] = data.getBoolean();
+ break;
+ case INT32:
+ ((int[]) columns[0])[rowIndex] = data.getInt();
+ break;
+ case DATE:
+ ((LocalDate[]) columns[0])[rowIndex] =
DateUtils.parseIntToLocalDate(data.getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ ((long[]) columns[0])[rowIndex] = data.getLong();
+ break;
+ case FLOAT:
+ ((float[]) columns[0])[rowIndex] = data.getFloat();
+ break;
+ case DOUBLE:
+ ((double[]) columns[0])[rowIndex] = data.getDouble();
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ ((Binary[]) columns[0])[rowIndex] = data.getBinary();
+ break;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported" +
data.getDataType());
+ }
+ }
+ }
+
+ private void moveToNextChunkReader() throws IOException,
IllegalStateException {
+ ChunkHeader chunkHeader;
+ Chunk timeChunk = null;
+ final List<Chunk> valueChunkList = new ArrayList<>();
+ currentMeasurements.clear();
+
+ if (lastMarker == MetaMarker.SEPARATOR) {
+ chunkReader = null;
+ return;
+ }
+
+ byte marker;
+ while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker :
tsFileSequenceReader.readMarker())
+ != MetaMarker.SEPARATOR) {
+ lastMarker = Byte.MIN_VALUE;
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+ chunkReader =
+ isMultiPage
+ ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+ : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
+ currentIsAligned = true;
+ lastMarker = marker;
+ return;
+ }
+
+ isMultiPage = marker == MetaMarker.CHUNK_HEADER || marker ==
MetaMarker.TIME_CHUNK_HEADER;
+
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+ if (Objects.isNull(currentDevice)) {
+ tsFileSequenceReader.position(
+ tsFileSequenceReader.position() + chunkHeader.getDataSize());
+ break;
+ }
+
+ if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK) {
+ timeChunk =
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ break;
+ }
+
+ if (!pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(
+ tsFileSequenceReader.position() + chunkHeader.getDataSize());
+ break;
+ }
+
+ chunkReader =
+ isMultiPage
+ ? new ChunkReader(
+ new Chunk(
+ chunkHeader,
+ tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())),
+ filter)
+ : new SinglePageWholeChunkReader(
+ new Chunk(
+ chunkHeader,
+ tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
+ currentIsAligned = false;
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ return;
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+ if (Objects.isNull(currentDevice)
+ || !pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(
+ tsFileSequenceReader.position() + chunkHeader.getDataSize());
+ break;
+ }
+
+ // Do not record empty chunk
+ if (chunkHeader.getDataSize() > 0) {
+ valueChunkList.add(
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ // Return before "currentDevice" changes
+ if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+ chunkReader =
+ isMultiPage
+ ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+ : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
+ currentIsAligned = true;
+ lastMarker = marker;
+ return;
+ }
+ // TODO: Replace it by IDeviceID
+ final String deviceID =
+ ((PlainDeviceID)
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
+ .toStringID();
+ currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID :
null;
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ tsFileSequenceReader.readPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+
+ lastMarker = marker;
+ if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+ chunkReader =
+ isMultiPage
+ ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+ : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
+ currentIsAligned = true;
+ } else {
+ chunkReader = null;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 34d331a0b6b..765c48536ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -510,7 +510,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
PipeDataNodeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
-
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()),
+ false);
deviceSet =
Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() :
resource.getDevices();
} catch (final IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 95be9281d5b..7bb67e781f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -164,9 +164,14 @@ public class PipeTsFileResource implements AutoCloseable {
return deviceMeasurementsMap;
}
- public synchronized Map<IDeviceID, Boolean> tryGetDeviceIsAlignedMap()
throws IOException {
+ public synchronized Map<IDeviceID, Boolean> tryGetDeviceIsAlignedMap(
+ final boolean cacheOtherMetadata) throws IOException {
if (deviceIsAlignedMap == null && isTsFile) {
- cacheObjectsIfAbsent();
+ if (cacheOtherMetadata) {
+ cacheObjectsIfAbsent();
+ } else {
+ cacheDeviceIsAlignedMapIfAbsent();
+ }
}
return deviceIsAlignedMap;
}
@@ -178,7 +183,7 @@ public class PipeTsFileResource implements AutoCloseable {
return measurementDataTypeMap;
}
- synchronized boolean cacheObjectsIfAbsent() throws IOException {
+ synchronized boolean cacheDeviceIsAlignedMapIfAbsent() throws IOException {
if (!isTsFile) {
return false;
}
@@ -205,11 +210,7 @@ public class PipeTsFileResource implements AutoCloseable {
long memoryRequiredInBytes = 0L;
try (TsFileSequenceReader sequenceReader =
- new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
- deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
- memoryRequiredInBytes +=
-
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
-
+ new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, false))
{
deviceIsAlignedMap = new HashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
sequenceReader.getAllDevicesIteratorWithIsAligned();
@@ -218,6 +219,76 @@ public class PipeTsFileResource implements AutoCloseable {
deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
}
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
+ }
+ // Release memory of TsFileSequenceReader.
+ allocatedMemoryBlock.close();
+ allocatedMemoryBlock = null;
+
+ // Allocate again for the cached objects.
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateIfSufficient(memoryRequiredInBytes,
MEMORY_SUFFICIENT_THRESHOLD);
+ if (allocatedMemoryBlock == null) {
+ LOGGER.info(
+ "PipeTsFileResource: Failed to cache objects for tsfile {} in cache,
because memory usage is high",
+ hardlinkOrCopiedFile.getPath());
+ deviceIsAlignedMap = null;
+ return false;
+ }
+
+ LOGGER.info(
+ "PipeTsFileResource: Cached deviceIsAlignedMap for tsfile {}.",
+ hardlinkOrCopiedFile.getPath());
+ return true;
+ }
+
+ synchronized boolean cacheObjectsIfAbsent() throws IOException {
+ if (!isTsFile) {
+ return false;
+ }
+
+ if (allocatedMemoryBlock != null) {
+ if (deviceMeasurementsMap != null) {
+ return true;
+ } else {
+ // Recalculate it again because only deviceIsAligned map is cached
+ allocatedMemoryBlock.close();
+ allocatedMemoryBlock = null;
+ }
+ }
+
+ // See if pipe memory is sufficient to be allocated for
TsFileSequenceReader.
+ // Only allocate when pipe memory used is less than 50%, because memory
here
+ // is hard to shrink and may consume too much memory.
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateIfSufficient(
+
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(),
+ MEMORY_SUFFICIENT_THRESHOLD);
+ if (allocatedMemoryBlock == null) {
+ LOGGER.info(
+ "PipeTsFileResource: Failed to create TsFileSequenceReader for
tsfile {} in cache, because memory usage is high",
+ hardlinkOrCopiedFile.getPath());
+ return false;
+ }
+
+ long memoryRequiredInBytes = 0L;
+ try (TsFileSequenceReader sequenceReader =
+ new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
+ deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
+ memoryRequiredInBytes +=
+
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+
+ if (Objects.isNull(deviceIsAlignedMap)) {
+ deviceIsAlignedMap = new HashMap<>();
+ final TsFileDeviceIterator deviceIsAlignedIterator =
+ sequenceReader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIsAlignedIterator.hasNext()) {
+ final Pair<IDeviceID, Boolean> deviceIsAlignedPair =
deviceIsAlignedIterator.next();
+ deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
+ }
+ }
+ memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index e40bafe03b7..e7367956126 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -278,13 +278,13 @@ public class PipeTsFileResourceManager {
}
}
- public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(final File
hardlinkOrCopiedTsFile)
- throws IOException {
+ public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(
+ final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata)
throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
- return resource == null ? null : resource.tryGetDeviceIsAlignedMap();
+ return resource == null ? null :
resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata);
} finally {
lock.unlock();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 3f9f34f722e..74d500006f8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -69,20 +71,33 @@ public class TsFileInsertionDataContainerTest {
}
@Test
- public void testToTabletInsertionEvents() throws Exception {
- Set<Integer> deviceNumbers = new HashSet<>();
+ public void testQueryContainer() throws Exception {
+ final long startTime = System.currentTimeMillis();
+ testToTabletInsertionEvents(true);
+ System.out.println(System.currentTimeMillis() - startTime);
+ }
+
+ @Test
+ public void testScanContainer() throws Exception {
+ final long startTime = System.currentTimeMillis();
+ testToTabletInsertionEvents(false);
+ System.out.println(System.currentTimeMillis() - startTime);
+ }
+
+ public void testToTabletInsertionEvents(final boolean isQuery) throws
Exception {
+ final Set<Integer> deviceNumbers = new HashSet<>();
deviceNumbers.add(1);
deviceNumbers.add(2);
- Set<Integer> measurementNumbers = new HashSet<>();
+ final Set<Integer> measurementNumbers = new HashSet<>();
measurementNumbers.add(1);
measurementNumbers.add(2);
- Set<String> patternFormats = new HashSet<>();
+ final Set<String> patternFormats = new HashSet<>();
patternFormats.add(PREFIX_FORMAT);
patternFormats.add(IOTDB_FORMAT);
- Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
+ final Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME + 1));
@@ -100,31 +115,34 @@ public class TsFileInsertionDataContainerTest {
startEndTimes.add(new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
- for (int deviceNumber : deviceNumbers) {
- for (int measurementNumber : measurementNumbers) {
- for (String patternFormat : patternFormats) {
- for (Pair<Long, Long> startEndTime : startEndTimes) {
+ for (final int deviceNumber : deviceNumbers) {
+ for (final int measurementNumber : measurementNumbers) {
+ for (final String patternFormat : patternFormats) {
+ for (final Pair<Long, Long> startEndTime : startEndTimes) {
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
0,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
2,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
@@ -132,21 +150,24 @@ public class TsFileInsertionDataContainerTest {
999,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1000,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1001,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
@@ -154,21 +175,24 @@ public class TsFileInsertionDataContainerTest {
999 * 2 + 1,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1000,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1001 * 2 - 1,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
@@ -176,21 +200,24 @@ public class TsFileInsertionDataContainerTest {
1023,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1024,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1025,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
@@ -198,21 +225,24 @@ public class TsFileInsertionDataContainerTest {
1023 * 2 + 1,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1024 * 2,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
measurementNumber,
1025 * 2 - 1,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
testToTabletInsertionEvents(
deviceNumber,
@@ -220,7 +250,8 @@ public class TsFileInsertionDataContainerTest {
10001,
patternFormat,
startEndTime.left,
- startEndTime.right);
+ startEndTime.right,
+ isQuery);
}
}
}
@@ -228,12 +259,13 @@ public class TsFileInsertionDataContainerTest {
}
private void testToTabletInsertionEvents(
- int deviceNumber,
- int measurementNumber,
- int rowNumberInOneDevice,
- String patternFormat,
- long startTime,
- long endTime)
+ final int deviceNumber,
+ final int measurementNumber,
+ final int rowNumberInOneDevice,
+ final String patternFormat,
+ final long startTime,
+ final long endTime,
+ final boolean isQuery)
throws Exception {
LOGGER.debug(
"testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {},
rowNumberInOneDevice: {}, patternFormat: {}, startTime: {}, endTime: {}",
@@ -298,12 +330,20 @@ public class TsFileInsertionDataContainerTest {
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile, rootPattern,
startTime, endTime);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ alignedTsFile, rootPattern, startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ alignedTsFile, rootPattern, startTime, endTime, null,
null);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(nonalignedTsFile, rootPattern,
startTime, endTime)) {
- AtomicInteger count1 = new AtomicInteger(0);
- AtomicInteger count2 = new AtomicInteger(0);
- AtomicInteger count3 = new AtomicInteger(0);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ nonalignedTsFile, rootPattern, startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ nonalignedTsFile, rootPattern, startTime, endTime, null,
null)) {
+ final AtomicInteger count1 = new AtomicInteger(0);
+ final AtomicInteger count2 = new AtomicInteger(0);
+ final AtomicInteger count3 = new AtomicInteger(0);
alignedContainer
.toTabletInsertionEvents()
@@ -366,8 +406,7 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
rowCollector.collectRow(row);
-
Assert.assertEquals(measurementNumber, row.size());
- count1.incrementAndGet();
+ count1.addAndGet(row.size());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -379,8 +418,7 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
collector.collectRow(row);
-
Assert.assertEquals(measurementNumber, row.size());
- count2.incrementAndGet();
+ count2.addAndGet(row.size());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -391,31 +429,30 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
collector.collectRow(row);
- Assert.assertEquals(
- measurementNumber,
row.size());
- count3.incrementAndGet();
+ count3.addAndGet(row.size());
} catch (IOException e) {
throw new
RuntimeException(e);
}
}))));
- Assert.assertEquals(deviceNumber * expectedRowNumber, count1.get());
- Assert.assertEquals(deviceNumber * expectedRowNumber, count2.get());
- Assert.assertEquals(deviceNumber * expectedRowNumber, count3.get());
- } catch (Exception e) {
+ // Calculate points in non-aligned tablets
+ Assert.assertEquals(deviceNumber * expectedRowNumber *
measurementNumber, count1.get());
+ Assert.assertEquals(deviceNumber * expectedRowNumber *
measurementNumber, count2.get());
+ Assert.assertEquals(deviceNumber * expectedRowNumber *
measurementNumber, count3.get());
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- AtomicReference<String> oneDeviceInAlignedTsFile = new AtomicReference<>();
- AtomicReference<String> oneMeasurementInAlignedTsFile = new
AtomicReference<>();
+ final AtomicReference<String> oneDeviceInAlignedTsFile = new
AtomicReference<>();
+ final AtomicReference<String> oneMeasurementInAlignedTsFile = new
AtomicReference<>();
- AtomicReference<String> oneDeviceInUnalignedTsFile = new
AtomicReference<>();
- AtomicReference<String> oneMeasurementInUnalignedTsFile = new
AtomicReference<>();
+ final AtomicReference<String> oneDeviceInUnalignedTsFile = new
AtomicReference<>();
+ final AtomicReference<String> oneMeasurementInUnalignedTsFile = new
AtomicReference<>();
- try (TsFileSequenceReader alignedReader =
+ try (final TsFileSequenceReader alignedReader =
new TsFileSequenceReader(alignedTsFile.getAbsolutePath());
- TsFileSequenceReader nonalignedReader =
+ final TsFileSequenceReader nonalignedReader =
new TsFileSequenceReader(nonalignedTsFile.getAbsolutePath())) {
alignedReader
@@ -457,14 +494,20 @@ public class TsFileInsertionDataContainerTest {
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(
- alignedTsFile, oneAlignedDevicePattern, startTime, endTime);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ alignedTsFile, oneAlignedDevicePattern, startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ alignedTsFile, oneAlignedDevicePattern, startTime,
endTime, null, null);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(
- nonalignedTsFile, oneNonAlignedDevicePattern, startTime,
endTime)) {
- AtomicInteger count1 = new AtomicInteger(0);
- AtomicInteger count2 = new AtomicInteger(0);
- AtomicInteger count3 = new AtomicInteger(0);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ nonalignedTsFile, oneNonAlignedDevicePattern, startTime,
endTime)
+ : new TsFileInsertionScanDataContainer(
+ nonalignedTsFile, oneNonAlignedDevicePattern, startTime,
endTime, null, null)) {
+ final AtomicInteger count1 = new AtomicInteger(0);
+ final AtomicInteger count2 = new AtomicInteger(0);
+ final AtomicInteger count3 = new AtomicInteger(0);
alignedContainer
.toTabletInsertionEvents()
@@ -527,8 +570,7 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
rowCollector.collectRow(row);
-
Assert.assertEquals(measurementNumber, row.size());
- count1.incrementAndGet();
+ count1.addAndGet(row.size());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -540,8 +582,7 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
collector.collectRow(row);
-
Assert.assertEquals(measurementNumber, row.size());
- count2.incrementAndGet();
+ count2.addAndGet(row.size());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -552,18 +593,17 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
collector.collectRow(row);
- Assert.assertEquals(
- measurementNumber,
row.size());
- count3.incrementAndGet();
+ count3.addAndGet(row.size());
} catch (IOException e) {
throw new
RuntimeException(e);
}
}))));
- Assert.assertEquals(expectedRowNumber, count1.get());
- Assert.assertEquals(expectedRowNumber, count2.get());
- Assert.assertEquals(expectedRowNumber, count3.get());
- } catch (Exception e) {
+ // Calculate points in non-aligned tablets
+ Assert.assertEquals(expectedRowNumber * measurementNumber, count1.get());
+ Assert.assertEquals(expectedRowNumber * measurementNumber, count2.get());
+ Assert.assertEquals(expectedRowNumber * measurementNumber, count3.get());
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -585,14 +625,25 @@ public class TsFileInsertionDataContainerTest {
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(
- alignedTsFile, oneAlignedMeasurementPattern, startTime,
endTime);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ alignedTsFile, oneAlignedMeasurementPattern, startTime,
endTime)
+ : new TsFileInsertionScanDataContainer(
+ alignedTsFile, oneAlignedMeasurementPattern, startTime,
endTime, null, null);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(
- nonalignedTsFile, oneNonAlignedMeasurementPattern, startTime,
endTime)) {
- AtomicInteger count1 = new AtomicInteger(0);
- AtomicInteger count2 = new AtomicInteger(0);
- AtomicInteger count3 = new AtomicInteger(0);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ nonalignedTsFile, oneNonAlignedMeasurementPattern,
startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ nonalignedTsFile,
+ oneNonAlignedMeasurementPattern,
+ startTime,
+ endTime,
+ null,
+ null)) {
+ final AtomicInteger count1 = new AtomicInteger(0);
+ final AtomicInteger count2 = new AtomicInteger(0);
+ final AtomicInteger count3 = new AtomicInteger(0);
alignedContainer
.toTabletInsertionEvents()
@@ -689,7 +740,7 @@ public class TsFileInsertionDataContainerTest {
Assert.assertEquals(expectedRowNumber, count1.get());
Assert.assertEquals(expectedRowNumber, count2.get());
Assert.assertEquals(expectedRowNumber, count3.get());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -706,13 +757,20 @@ public class TsFileInsertionDataContainerTest {
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile, notExistPattern,
startTime, endTime);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ alignedTsFile, notExistPattern, startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ alignedTsFile, notExistPattern, startTime, endTime, null,
null);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(
- nonalignedTsFile, notExistPattern, startTime, endTime)) {
- AtomicInteger count1 = new AtomicInteger(0);
- AtomicInteger count2 = new AtomicInteger(0);
- AtomicInteger count3 = new AtomicInteger(0);
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(
+ nonalignedTsFile, notExistPattern, startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ nonalignedTsFile, notExistPattern, startTime, endTime,
null, null)) {
+ final AtomicInteger count1 = new AtomicInteger(0);
+ final AtomicInteger count2 = new AtomicInteger(0);
+ final AtomicInteger count3 = new AtomicInteger(0);
alignedContainer
.toTabletInsertionEvents()
@@ -809,7 +867,7 @@ public class TsFileInsertionDataContainerTest {
Assert.assertEquals(0, count1.get());
Assert.assertEquals(0, count2.get());
Assert.assertEquals(0, count3.get());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1c6fb77e835..aef1a020a7f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -241,6 +241,7 @@ public class CommonConfig {
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
private PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
PipeRemainingTimeRateAverageTime.MEAN;
+ private double pipeTsFileScanParsingThreshold = 0.05;
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -1044,6 +1045,14 @@ public class CommonConfig {
this.pipeRemainingTimeCommitRateAverageTime =
pipeRemainingTimeCommitRateAverageTime;
}
+ public double getPipeTsFileScanParsingThreshold() {
+ return pipeTsFileScanParsingThreshold;
+ }
+
+ public void setPipeTsFileScanParsingThreshold(double
pipeTsFileScanParsingThreshold) {
+ this.pipeTsFileScanParsingThreshold = pipeTsFileScanParsingThreshold;
+ }
+
public double getPipeAllSinksRateLimitBytesPerSecond() {
return pipeAllSinksRateLimitBytesPerSecond;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b989e8e5055..135cef409b0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -559,6 +559,11 @@ public class CommonDescriptor {
"pipe_remaining_time_commit_rate_average_time",
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
.trim()));
+ config.setPipeTsFileScanParsingThreshold(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_tsfile_scan_parsing_threshold",
+ String.valueOf(config.getPipeTsFileScanParsingThreshold()))));
config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 818ef38dd84..270a5d4fb8e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -152,6 +152,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
}
+ public double getPipeTsFileScanParsingThreshold() {
+ return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -341,6 +345,7 @@ public class PipeConfig {
getPipeRemainingTimeCommitAutoSwitchSeconds());
LOGGER.info(
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
+ LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
index 30307d07711..4ca04ef35e1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -185,6 +185,10 @@ public class IoTDBPipePattern extends PipePattern {
return !patternPartialPath.hasWildcard();
}
+ public boolean mayMatchMultipleTimeSeriesInOneDevice() {
+ return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode());
+ }
+
@Override
public String toString() {
return "IoTDBPipePattern" + super.toString();