This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 397f36edafd [IOTDB-5973] Pipe: fix ClassCastException when using
pipe.core.event.view & support collector.pattern in historical collector
(#10058)
397f36edafd is described below
commit 397f36edafdab5146f3ea9a7cd943fc3c4262e89
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 7 02:04:24 2023 +0800
[IOTDB-5973] Pipe: fix ClassCastException when using pipe.core.event.view &
support collector.pattern in historical collector (#10058)
* remove pipe_connector_session_id in config
* fix ClassCastException when using pipe.core.event.view
* support collector.pattern in historical collector
---
.../event/dml/insertion/TabletInsertionEvent.java | 12 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 9 -
.../iotdb/commons/conf/CommonDescriptor.java | 4 -
.../iotdb/commons/pipe/config/PipeConfig.java | 4 -
.../PipeHistoricalDataRegionTsFileCollector.java | 19 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 23 +-
.../event/impl/PipeEmptyTabletInsertionEvent.java | 39 --
.../impl/PipeInsertNodeTabletInsertionEvent.java | 32 +-
...Event.java => PipeRawTabletInsertionEvent.java} | 20 +-
.../core/event/impl/PipeTsFileInsertionEvent.java | 7 +-
.../db/pipe/core/event/view/access/PipeRow.java | 32 +-
.../event/view/collector/PipeRowCollector.java | 23 +-
.../TabletInsertionDataContainer.java | 179 +++---
.../TsFileInsertionDataContainer.java | 145 +++--
.../TsFileInsertionDataTabletIterator.java | 286 +++-------
.../core/processor/PipeDoNothingProcessor.java | 14 +-
...Test.java => PipeTabletInsertionEventTest.java} | 47 +-
.../event/TsFileInsertionDataContainerTest.java | 608 +++++++++++++++++++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 18 +
19 files changed, 994 insertions(+), 527 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 4a8073a5a26..09b129a9cc4 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -32,16 +32,16 @@ public interface TabletInsertionEvent extends Event {
/**
* The consumer processes the data row by row and collects the results by
RowCollector.
*
- * @return TabletInsertionEvent a new TabletInsertionEvent contains the
results collected by the
- * RowCollector
+ * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent
contains the results
+ * collected by the RowCollector
*/
- TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer);
+ Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector>
consumer);
/**
* The consumer processes the Tablet directly and collects the results by
RowCollector.
*
- * @return TabletInsertionEvent a new TabletInsertionEvent contains the
results collected by the
- * RowCollector
+ * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent
contains the results
+ * collected by the RowCollector
*/
- TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer);
+ Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer);
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4f70504681e..b01128cefcc 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -156,7 +156,6 @@ public class CommonConfig {
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
private int pipeConnectorPendingQueueSize = 1024;
- private long pipeConnectorSessionId = Long.MAX_VALUE / 2;
private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -551,12 +550,4 @@ public class CommonConfig {
this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs =
pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
-
- public long getPipeConnectorSessionId() {
- return pipeConnectorSessionId;
- }
-
- public void setPipeConnectorSessionId(long pipeSessionId) {
- this.pipeConnectorSessionId = pipeSessionId;
- }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e45c4af45c5..bd13ad3c53b 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -285,10 +285,6 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize()))));
- config.setPipeConnectorSessionId(
- Long.parseLong(
- properties.getProperty(
- "pipe_connector_session_id",
String.valueOf(config.getPipeConnectorSessionId()))));
config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
Integer.parseInt(
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index af0fcd379ff..365ea0b8003 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -85,10 +85,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
}
- public long getPipeConnectorSessionId() {
- return COMMON_CONFIG.getPipeConnectorSessionId();
- }
-
/////////////////////////////// Meta Consistency
///////////////////////////////
public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index a1742f9fce3..583cbd5d853 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -56,6 +56,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends
PipeHistoricalDataR
private final PipeTaskMeta pipeTaskMeta;
private final ProgressIndex startIndex;
+ private String pattern;
private int dataRegionId;
private final long historicalDataCollectionTimeLowerBound;
@@ -80,6 +81,10 @@ public class PipeHistoricalDataRegionTsFileCollector extends
PipeHistoricalDataR
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {
+ pattern =
+ parameters.getStringOrDefault(
+ PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+ PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
dataRegionId = parameters.getInt(DATA_REGION_KEY);
historicalDataCollectionStartTime =
parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
@@ -153,12 +158,7 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
- .map(
- resource ->
- new PipeTsFileInsertionEvent(
- resource,
- pipeTaskMeta,
-
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
+ .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta, pattern))
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
@@ -167,12 +167,7 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
- .map(
- resource ->
- new PipeTsFileInsertionEvent(
- resource,
- pipeTaskMeta,
-
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
+ .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta, pattern))
.collect(Collectors.toList()));
pendingQueue.forEach(
event ->
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index c67e3966cc2..4c0b1c0e54e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -33,9 +33,8 @@ import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -120,13 +119,11 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
try {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof
PipeTabletTabletInsertionEvent) {
- doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof
PipeEmptyTabletInsertionEvent) {
- doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
} else {
throw new NotImplementedException(
- "IoTDBThriftConnectorV1 only support
PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent.");
+ "IoTDBThriftConnectorV1 only support
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
}
} catch (TException e) {
LOGGER.error(
@@ -154,25 +151,21 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
}
}
- private void doTransfer(PipeTabletTabletInsertionEvent
pipeTabletTabletInsertionEvent)
+ private void doTransfer(PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException, TException, IOException {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferTabletReq.toTPipeTransferReq(
- pipeTabletTabletInsertionEvent.convertToTablet()));
+ pipeRawTabletInsertionEvent.convertToTablet()));
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
- "Transfer PipeTabletTabletInsertionEvent %s error, result status
%s",
- pipeTabletTabletInsertionEvent, resp.status));
+ "Transfer PipeRawTabletInsertionEvent %s error, result status
%s",
+ pipeRawTabletInsertionEvent, resp.status));
}
}
- private void doTransfer(PipeEmptyTabletInsertionEvent
pipeEmptyTabletInsertionEvent) {
- // do nothing
- }
-
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
deleted file mode 100644
index 855da8fa8b0..00000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.impl;
-
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import java.util.function.BiConsumer;
-
-public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent {
- @Override
- public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
- return this;
- }
-
- @Override
- public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
- return this;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
index 416fea8b734..dd65ad4c319 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
@@ -116,7 +115,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
- public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
+ public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
try {
if (dataContainer == null) {
dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
@@ -129,7 +128,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
@Override
- public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
try {
if (dataContainer == null) {
dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
@@ -141,36 +140,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
}
- public Tablet convertToTablet() {
- try {
- if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
- }
- return dataContainer.convertToTablet();
- } catch (Exception e) {
- LOGGER.error("Process tablet error.", e);
- throw new PipeException("Process tablet error.", e);
- }
- }
-
- @TestOnly
- public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) {
- try {
- if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(insertNode, pattern);
- }
- return dataContainer.convertToTablet();
- } catch (Exception e) {
- LOGGER.error("Process tablet error.", e);
- throw new PipeException("Process tablet error.", e);
- }
- }
-
/////////////////////////// Object ///////////////////////////
@Override
public String toString() {
- return "PipeTabletTabletInsertionEvent{"
+ return "PipeRawTabletInsertionEvent{"
+ "walEntryHandler="
+ walEntryHandler
+ ", progressIndex="
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
similarity index 74%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
index 014972abd7f..abb33534eb8 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Objects;
import java.util.function.BiConsumer;
-public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeRawTabletInsertionEvent implements TabletInsertionEvent {
private final Tablet tablet;
private final String pattern;
private TabletInsertionDataContainer dataContainer;
- public PipeTabletTabletInsertionEvent(Tablet tablet) {
+ public PipeRawTabletInsertionEvent(Tablet tablet) {
this(Objects.requireNonNull(tablet), null);
}
- public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) {
+ public PipeRawTabletInsertionEvent(Tablet tablet, String pattern) {
this.tablet = Objects.requireNonNull(tablet);
this.pattern = pattern;
}
@@ -52,7 +52,7 @@ public class PipeTabletTabletInsertionEvent implements
TabletInsertionEvent {
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
- public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
+ public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
if (dataContainer == null) {
dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
}
@@ -60,7 +60,7 @@ public class PipeTabletTabletInsertionEvent implements
TabletInsertionEvent {
}
@Override
- public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
if (dataContainer == null) {
dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
}
@@ -68,8 +68,16 @@ public class PipeTabletTabletInsertionEvent implements
TabletInsertionEvent {
}
public Tablet convertToTablet() {
+ final String pattern = getPattern();
+
+ // if pattern is "root", we don't need to convert, just return the
original tablet
+ if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
{
+ return tablet;
+ }
+
+ // if pattern is not "root", we need to convert the tablet
if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+ dataContainer = new TabletInsertionDataContainer(tablet, pattern);
}
return dataContainer.convertToTablet();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index c813619be9a..2de0cd31d19 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class PipeTsFileInsertionEvent extends EnrichedEvent implements
TsFileInsertionEvent {
@@ -152,12 +153,16 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
return dataContainer.toTabletInsertionEvents();
} catch (InterruptedException e) {
- String errorMsg =
+ final String errorMsg =
String.format(
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath());
LOGGER.warn(errorMsg);
Thread.currentThread().interrupt();
throw new PipeException(errorMsg);
+ } catch (IOException e) {
+ final String errorMsg = String.format("Read TsFile %s error.",
resource.getTsFilePath());
+ LOGGER.warn(errorMsg);
+ throw new PipeException(errorMsg);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index dfab1e1aa2b..85f4a210ccb 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.type.Binary;
import org.apache.iotdb.pipe.api.type.Type;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.Arrays;
@@ -39,8 +40,9 @@ public class PipeRow implements Row {
private final MeasurementSchema[] measurementSchemaList;
private final long[] timestampColumn;
- private final Object[] valueColumns;
private final TSDataType[] valueColumnTypes;
+ private final Object[] valueColumns;
+ private final BitMap[] bitMaps;
private final String[] columnNameStringList;
@@ -49,15 +51,17 @@ public class PipeRow implements Row {
String deviceId,
MeasurementSchema[] measurementSchemaList,
long[] timestampColumn,
- Object[] valueColumns,
TSDataType[] valueColumnTypes,
+ Object[] valueColumns,
+ BitMap[] bitMaps,
String[] columnNameStringList) {
this.rowIndex = rowIndex;
this.deviceId = deviceId;
this.measurementSchemaList = measurementSchemaList;
this.timestampColumn = timestampColumn;
- this.valueColumns = valueColumns;
this.valueColumnTypes = valueColumnTypes;
+ this.valueColumns = valueColumns;
+ this.bitMaps = bitMaps;
this.columnNameStringList = columnNameStringList;
}
@@ -103,7 +107,25 @@ public class PipeRow implements Row {
@Override
public Object getObject(int columnIndex) {
- return ((Object[]) valueColumns[columnIndex])[rowIndex];
+ switch (getDataType(columnIndex)) {
+ case INT32:
+ return getInt(columnIndex);
+ case INT64:
+ return getLong(columnIndex);
+ case FLOAT:
+ return getFloat(columnIndex);
+ case DOUBLE:
+ return getDouble(columnIndex);
+ case BOOLEAN:
+ return getBoolean(columnIndex);
+ case TEXT:
+ return getBinary(columnIndex);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "unsupported data type %s for column %s",
+ getDataType(columnIndex), columnNameStringList[columnIndex]));
+ }
}
@Override
@@ -113,7 +135,7 @@ public class PipeRow implements Row {
@Override
public boolean isNull(int columnIndex) {
- return ((Object[]) valueColumns[columnIndex])[rowIndex] == null;
+ return bitMaps[columnIndex].isMarked(rowIndex);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index ab0371252f3..8010b67e57e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,8 +19,7 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -35,6 +34,7 @@ import java.util.List;
public class PipeRowCollector implements RowCollector {
+ private final List<TabletInsertionEvent> tabletInsertionEventList = new
ArrayList<>();
private Tablet tablet = null;
@Override
@@ -63,16 +63,21 @@ public class PipeRowCollector implements RowCollector {
}
}
tablet.rowSize++;
- }
- public TabletInsertionEvent toTabletInsertionEvent() {
- if (tablet == null) {
- return new PipeEmptyTabletInsertionEvent();
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ collectTabletInsertionEvent();
}
+ }
- PipeTabletTabletInsertionEvent tabletInsertionEvent =
- new PipeTabletTabletInsertionEvent(tablet);
+ private void collectTabletInsertionEvent() {
+ if (tablet != null) {
+ tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet));
+ }
this.tablet = null;
- return tabletInsertionEvent;
+ }
+
+ public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() {
+ collectTabletInsertionEvent();
+ return tabletInsertionEventList;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
index 16d21f73138..542847074c0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -23,7 +23,6 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
import org.apache.iotdb.pipe.api.access.Row;
@@ -40,6 +39,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
@@ -52,8 +52,9 @@ public class TabletInsertionDataContainer {
private String[] columnNameStringList;
private long[] timestampColumn;
- private Object[] valueColumns;
private TSDataType[] valueColumnTypes;
+ // each column of Object[] is a column of primitive type array
+ private Object[] valueColumns;
private BitMap[] nullValueColumnBitmaps;
private int rowCount;
@@ -89,7 +90,8 @@ public class TabletInsertionDataContainer {
this.timestampColumn = new long[] {insertRowNode.getTime()};
generateColumnIndexMapper(
- insertRowNode, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+ insertRowNode.getMeasurements(), pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+
final int filteredColumnSize =
Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
.filter(Objects::nonNull)
@@ -98,22 +100,46 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
- this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
+ this.valueColumns = new Object[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
final MeasurementSchema[] originMeasurementSchemaList =
insertRowNode.getMeasurementSchemas();
final String[] originColumnNameStringList =
insertRowNode.getMeasurements();
- final Object[] originValueColumns = insertRowNode.getValues();
final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes();
+ final Object[] originValueColumns = insertRowNode.getValues();
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
- this.valueColumns[filteredColumnIndex] = originValueColumns[i];
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+ switch (originValueColumnTypes[i]) {
+ case INT32:
+ this.valueColumns[filteredColumnIndex] = new int[] {(Integer)
originValueColumns[i]};
+ break;
+ case INT64:
+ this.valueColumns[filteredColumnIndex] = new long[] {(Long)
originValueColumns[i]};
+ break;
+ case FLOAT:
+ this.valueColumns[filteredColumnIndex] = new float[] {(Float)
originValueColumns[i]};
+ break;
+ case DOUBLE:
+ this.valueColumns[filteredColumnIndex] = new double[] {(Double)
originValueColumns[i]};
+ break;
+ case BOOLEAN:
+ this.valueColumns[filteredColumnIndex] =
+ new boolean[] {(Boolean) originValueColumns[i]};
+ break;
+ case TEXT:
+ this.valueColumns[filteredColumnIndex] = new Binary[] {(Binary)
originValueColumns[i]};
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Data type %s is not supported.",
originValueColumnTypes[i].toString()));
+ }
this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
}
}
@@ -130,7 +156,9 @@ public class TabletInsertionDataContainer {
this.timestampColumn = insertTabletNode.getTimes();
generateColumnIndexMapper(
- insertTabletNode, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+ insertTabletNode.getMeasurements(),
+ pattern,
+ originColumnIndex2FilteredColumnIndexMapperList);
final int filteredColumnSize =
Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
@@ -140,15 +168,15 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
- this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
+ this.valueColumns = new Object[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
final MeasurementSchema[] originMeasurementSchemaList =
insertTabletNode.getMeasurementSchemas();
final String[] originColumnNameStringList =
insertTabletNode.getMeasurements();
- final Object[] originValueColumns = insertTabletNode.getColumns();
final TSDataType[] originValueColumnTypes =
insertTabletNode.getDataTypes();
+ final Object[] originValueColumns = insertTabletNode.getColumns();
final BitMap[] originBitMapList =
(insertTabletNode.getBitMaps() == null
? IntStream.range(0, originColumnSize)
@@ -156,15 +184,19 @@ public class TabletInsertionDataContainer {
.map(o -> new BitMap(timestampColumn.length))
.toArray(BitMap[]::new)
: insertTabletNode.getBitMaps());
+ for (int i = 0; i < originBitMapList.length; i++) {
+ if (originBitMapList[i] == null) {
+ originBitMapList[i] = new BitMap(timestampColumn.length);
+ }
+ }
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
- this.valueColumns[filteredColumnIndex] =
- convertToColumn(originValueColumns[i], originValueColumnTypes[i],
originBitMapList[i]);
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+ this.valueColumns[filteredColumnIndex] = originValueColumns[i];
this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
}
}
@@ -179,7 +211,13 @@ public class TabletInsertionDataContainer {
this.deviceId = tablet.deviceId;
this.timestampColumn = tablet.timestamps;
- generateColumnIndexMapper(tablet, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+ final List<MeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
+ final String[] originMeasurementList = new
String[originMeasurementSchemaList.size()];
+ for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
+ originMeasurementList[i] =
originMeasurementSchemaList.get(i).getMeasurementId();
+ }
+ generateColumnIndexMapper(
+ originMeasurementList, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
final int filteredColumnSize =
Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
@@ -189,11 +227,10 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
- this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
+ this.valueColumns = new Object[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
- final List<MeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
final String[] originColumnNameStringList = new String[originColumnSize];
final TSDataType[] originValueColumnTypes = new
TSDataType[originColumnSize];
for (int i = 0; i < originColumnSize; i++) {
@@ -201,16 +238,26 @@ public class TabletInsertionDataContainer {
originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
}
final Object[] originValueColumns = tablet.values;
- final BitMap[] originBitMapList = tablet.bitMaps;
+ final BitMap[] originBitMapList =
+ tablet.bitMaps == null
+ ? IntStream.range(0, originColumnSize)
+ .boxed()
+ .map(o -> new BitMap(timestampColumn.length))
+ .toArray(BitMap[]::new)
+ : tablet.bitMaps;
+ for (int i = 0; i < originBitMapList.length; i++) {
+ if (originBitMapList[i] == null) {
+ originBitMapList[i] = new BitMap(timestampColumn.length);
+ }
+ }
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList.get(i);
this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
- this.valueColumns[filteredColumnIndex] =
- convertToColumn(originValueColumns[i], originValueColumnTypes[i],
originBitMapList[i]);
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+ this.valueColumns[filteredColumnIndex] = originValueColumns[i];
this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
}
}
@@ -218,6 +265,7 @@ public class TabletInsertionDataContainer {
rowCount = tablet.rowSize;
}
+ // TODO: cache the result keyed by deviceId to improve performance
private void generateColumnIndexMapper(
String[] originMeasurementList,
String pattern,
@@ -243,7 +291,6 @@ public class TabletInsertionDataContainer {
// low cost check comes first
if (pattern.length() == deviceId.length() + measurement.length() + 1
// high cost check comes later
- && pattern.startsWith(deviceId)
&& pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
}
@@ -251,107 +298,34 @@ public class TabletInsertionDataContainer {
}
}
- private void generateColumnIndexMapper(
- InsertNode insertNode,
- String pattern,
- Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
- generateColumnIndexMapper(
- insertNode.getMeasurements(), pattern,
originColumnIndex2FilteredColumnIndexMapperList);
- }
-
- private void generateColumnIndexMapper(
- Tablet tablet, String pattern, Integer[]
originColumnIndex2FilteredColumnIndexMapperList) {
- final List<MeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
- final String[] originMeasurementList = new
String[originMeasurementSchemaList.size()];
- for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
- originMeasurementList[i] =
originMeasurementSchemaList.get(i).getMeasurementId();
- }
- generateColumnIndexMapper(
- originMeasurementList, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
- }
-
- private Object convertToColumn(Object originColumn, TSDataType dataType,
BitMap bitMap) {
- switch (dataType) {
- case INT32:
- final int[] intValues = (int[]) originColumn;
- final int[] integerValues = new int[intValues.length];
- for (int i = 0; i < intValues.length; i++) {
- integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
intValues[i];
- }
- return integerValues;
- case INT64:
- final long[] longValues = (long[]) originColumn;
- final long[] longValues2 = new long[longValues.length];
- for (int i = 0; i < longValues.length; i++) {
- longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
longValues[i];
- }
- return longValues2;
- case FLOAT:
- final float[] floatValues = (float[]) originColumn;
- final float[] floatValues2 = new float[floatValues.length];
- for (int i = 0; i < floatValues.length; i++) {
- floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
floatValues[i];
- }
- return floatValues2;
- case DOUBLE:
- final double[] doubleValues = (double[]) originColumn;
- final double[] doubleValues2 = new double[doubleValues.length];
- for (int i = 0; i < doubleValues.length; i++) {
- doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
doubleValues[i];
- }
- return doubleValues2;
- case BOOLEAN:
- final boolean[] booleanValues = (boolean[]) originColumn;
- final boolean[] booleanValues2 = new boolean[booleanValues.length];
- for (int i = 0; i < booleanValues.length; i++) {
- booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) &&
booleanValues[i];
- }
- return booleanValues2;
- case TEXT:
- final Binary[] binaryValues = (Binary[]) originColumn;
- final Binary[] stringValues = new Binary[binaryValues.length];
- for (int i = 0; i < binaryValues.length; i++) {
- stringValues[i] =
- bitMap != null && bitMap.isMarked(i)
- ? null
- : (binaryValues[i] == null
- ? null
- : Binary.valueOf(binaryValues[i].getStringValue()));
- }
- return stringValues;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataType));
- }
- }
-
//////////////////////////// process ////////////////////////////
- public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
- final PipeRowCollector rowCollector = new PipeRowCollector();
- if (valueColumns.length == 0) {
- return new PipeEmptyTabletInsertionEvent();
+ public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
+ if (valueColumns.length == 0 || timestampColumn.length == 0) {
+ return Collections.emptyList();
}
- for (int i = 0; i < timestampColumn.length; i++) {
+ final PipeRowCollector rowCollector = new PipeRowCollector();
+ for (int i = 0; i < rowCount; i++) {
consumer.accept(
new PipeRow(
i,
deviceId,
measurementSchemaList,
timestampColumn,
- valueColumns,
valueColumnTypes,
+ valueColumns,
+ nullValueColumnBitmaps,
columnNameStringList),
rowCollector);
}
- return rowCollector.toTabletInsertionEvent();
+ return rowCollector.convertToTabletInsertionEvents();
}
- public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
final PipeRowCollector rowCollector = new PipeRowCollector();
consumer.accept(convertToTablet(), rowCollector);
- return rowCollector.toTabletInsertionEvent();
+ return rowCollector.convertToTabletInsertionEvents();
}
//////////////////////////// convert ////////////////////////////
@@ -362,10 +336,8 @@ public class TabletInsertionDataContainer {
}
final int columnSize = measurementSchemaList.length;
-
final List<MeasurementSchema> measurementSchemaArrayList =
new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0,
columnSize));
-
final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList,
rowCount);
newTablet.timestamps = timestampColumn;
newTablet.bitMaps = nullValueColumnBitmaps;
@@ -373,6 +345,7 @@ public class TabletInsertionDataContainer {
newTablet.rowSize = rowCount;
tablet = newTablet;
+
return tablet;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
index 9035a8fe0bb..79274e48fc7 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.pipe.core.event.view.datastructure;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,100 +37,127 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
-public class TsFileInsertionDataContainer {
+public class TsFileInsertionDataContainer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
- private final File tsFile;
private final String pattern;
- private TimeseriesMetadata vectorTimeseriesMetadata;
+ private final TsFileSequenceReader tsFileSequenceReader;
+ private final TsFileReader tsFileReader;
- private final Map<String, List<TimeseriesMetadata>>
device2TimeseriesMetadataMap;
+ private final Iterator<Map.Entry<String, List<String>>>
deviceMeasurementsMapIterator;
+ private final Map<String, TSDataType> measurementDataTypeMap;
- public TsFileInsertionDataContainer(File tsFile, String pattern) {
- this.tsFile = tsFile;
+ public TsFileInsertionDataContainer(File tsFile, String pattern) throws
IOException {
this.pattern = pattern;
- this.device2TimeseriesMetadataMap = collectDevice2TimeseriesMetadataMap();
+ tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath());
+ tsFileReader = new TsFileReader(tsFileSequenceReader);
+
+ deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+ measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
}
- private Map<String, List<TimeseriesMetadata>>
collectDevice2TimeseriesMetadataMap() {
- final Map<String, List<TimeseriesMetadata>> result = new HashMap<>();
+ private Map<String, List<String>> filterDeviceMeasurementsMapByPattern()
throws IOException {
+ final Map<String, List<String>> filteredDeviceMeasurementsMap = new
HashMap<>();
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getPath())) {
- // match pattern
- for (Map.Entry<String, List<TimeseriesMetadata>> entry :
- reader.getAllTimeseriesMetadata(true).entrySet()) {
- final String device = entry.getKey();
- boolean isVector = false;
+ for (Map.Entry<String, List<String>> entry :
+ tsFileSequenceReader.getDeviceMeasurementsMap().entrySet()) {
+ final String deviceId = entry.getKey();
- // case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
- // in this case, all data can be matched without checking the
measurements
- if (pattern == null || pattern.length() <= device.length() &&
device.startsWith(pattern)) {
- result.put(device, entry.getValue());
+ // case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
+ // in this case, all data can be matched without checking the
measurements
+ if (pattern == null
+ || pattern.length() <= deviceId.length() &&
deviceId.startsWith(pattern)) {
+ if (!entry.getValue().isEmpty()) {
+ filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
+ }
- // case 2: for example, pattern is root.a.b.c and device is root.a.b
- // in this case, we need to check the full path
- else {
- final List<TimeseriesMetadata> timeseriesMetadataList = new
ArrayList<>();
-
- for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
- // TODO: test me!!!
- if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
- vectorTimeseriesMetadata = timeseriesMetadata;
- isVector = false;
- continue;
- }
-
- final String measurement = timeseriesMetadata.getMeasurementId();
- // low cost check comes first
- if (pattern.length() == measurement.length() + device.length() + 1
- // high cost check comes later
- && pattern.startsWith(device)
- && pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
- if (!isVector) {
- isVector = true;
- timeseriesMetadataList.add(vectorTimeseriesMetadata);
- }
- timeseriesMetadataList.add(timeseriesMetadata);
- }
+ // case 2: for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, we need to check the full path
+ else if (pattern.length() > deviceId.length() &&
pattern.startsWith(deviceId)) {
+ final List<String> filteredMeasurements = new ArrayList<>();
+
+ for (final String measurement : entry.getValue()) {
+ // low cost check comes first
+ if (pattern.length() == deviceId.length() + measurement.length() + 1
+ // high cost check comes later
+ && pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
+ filteredMeasurements.add(measurement);
}
+ }
- if (!timeseriesMetadataList.isEmpty()) {
- result.put(device, timeseriesMetadataList);
- }
+ if (!filteredMeasurements.isEmpty()) {
+ filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
}
}
- } catch (IOException e) {
- LOGGER.error("Cannot read TsFile {}.", tsFile.getPath(), e);
}
- return result;
+ return filteredDeviceMeasurementsMap;
}
+ /** @return TabletInsertionEvent in a streaming way */
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return () ->
new Iterator<TabletInsertionEvent>() {
- private final Iterator<Tablet> tabletIterator =
constructTabletIterable().iterator();
+ private TsFileInsertionDataTabletIterator tabletIterator = null;
@Override
public boolean hasNext() {
- return tabletIterator.hasNext();
+ return (tabletIterator != null && tabletIterator.hasNext())
+ || deviceMeasurementsMapIterator.hasNext();
}
@Override
public TabletInsertionEvent next() {
- return new PipeTabletTabletInsertionEvent(tabletIterator.next());
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ while (tabletIterator == null || !tabletIterator.hasNext()) {
+ if (!deviceMeasurementsMapIterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final Map.Entry<String, List<String>> entry =
deviceMeasurementsMapIterator.next();
+
+ try {
+ tabletIterator =
+ new TsFileInsertionDataTabletIterator(
+ tsFileReader, measurementDataTypeMap, entry.getKey(),
entry.getValue());
+ } catch (IOException e) {
+ throw new PipeException("failed to create
TsFileInsertionDataTabletIterator", e);
+ }
+ }
+
+ final TabletInsertionEvent next =
+ new PipeRawTabletInsertionEvent(tabletIterator.next());
+
+ if (!hasNext()) {
+ try {
+ close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close TsFileInsertionDataContainer", e);
+ }
+ }
+
+ return next;
}
};
}
- private Iterable<Tablet> constructTabletIterable() {
- return () ->
- new TsFileInsertionDataTabletIterator(tsFile.getPath(),
device2TimeseriesMetadataMap);
+ @Override
+ public void close() throws Exception {
+ if (tsFileReader != null) {
+ tsFileReader.close();
+ }
+ if (tsFileSequenceReader != null) {
+ tsFileSequenceReader.close();
+ }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
index 7100e68b753..15959adb7ee 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
@@ -20,101 +20,72 @@
package org.apache.iotdb.db.pipe.core.event.view.datastructure;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
-import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
- private static Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataTabletIterator.class);
- private final TsFileSequenceReader reader;
- private final String filePath;
- private final Iterator<Map.Entry<String, List<TimeseriesMetadata>>>
entriesIterator;
- private Map.Entry<String, List<TimeseriesMetadata>> currentEntry;
- private Iterator<TimeseriesMetadata> timeseriesMetadataIterator;
- private TimeseriesMetadata currentTimeseriesMetadata;
- private List<MeasurementSchema> measurementSchemas;
+ private final TsFileReader tsFileReader;
+ private final Map<String, TSDataType> measurementDataTypeMap;
- private boolean isAligned;
- private final List<long[]> timeBatches;
- private long[] timestampsForAligned;
+ private final String deviceId;
+ private final List<String> measurements;
- public TsFileInsertionDataTabletIterator(
- String filePath, Map<String, List<TimeseriesMetadata>>
device2TimeseriesMetadataMap) {
- this.filePath = filePath;
- this.entriesIterator = device2TimeseriesMetadataMap.entrySet().iterator();
- this.timeBatches = new ArrayList<>();
- this.currentEntry = null;
- this.timeseriesMetadataIterator = null;
- this.currentTimeseriesMetadata = null;
- this.measurementSchemas = null;
- this.isAligned = false;
- this.timestampsForAligned = null;
- try {
- this.reader = new TsFileSequenceReader(filePath);
- } catch (IOException e) {
- throw new PipeException("Cannot create TsFileSequenceReader for file " +
filePath, e);
- }
+ private final QueryDataSet queryDataSet;
- // Initialize timeseriesMetadataIterator if there is a next entry
- if (entriesIterator.hasNext()) {
- currentEntry = entriesIterator.next();
- timeseriesMetadataIterator = currentEntry.getValue().iterator();
- } else {
- timeseriesMetadataIterator =
- new Iterator<TimeseriesMetadata>() {
- @Override
- public boolean hasNext() {
- return false;
- }
+ public TsFileInsertionDataTabletIterator(
+ TsFileReader tsFileReader,
+ Map<String, TSDataType> measurementDataTypeMap,
+ String deviceId,
+ List<String> measurements)
+ throws IOException {
+ this.tsFileReader = tsFileReader;
+ this.measurementDataTypeMap = measurementDataTypeMap;
+
+ this.deviceId = deviceId;
+ this.measurements =
+ measurements.stream()
+ .filter(
+ measurement ->
+ // time column in aligned time-series should not be a
query column
+ measurement != null && !measurement.isEmpty())
+ .sorted()
+ .collect(Collectors.toList());
+
+ this.queryDataSet = buildQueryDataSet();
+ }
- @Override
- public TimeseriesMetadata next() {
- return null;
- }
- };
+ private QueryDataSet buildQueryDataSet() throws IOException {
+ final List<Path> paths = new ArrayList<>();
+ for (String measurement : measurements) {
+ paths.add(new Path(deviceId, measurement, false));
}
+ return tsFileReader.query(QueryExpression.create(paths, null));
}
@Override
public boolean hasNext() {
- boolean hasNext = timeseriesMetadataIterator.hasNext() ||
entriesIterator.hasNext();
- if (!hasNext) {
- try {
- reader.close();
- } catch (IOException e) {
- LOGGER.warn("Cannot close TsFileSequenceReader for file {}", filePath,
e);
- }
+ try {
+ return queryDataSet.hasNext();
+ } catch (IOException e) {
+ throw new PipeException("Failed to check next", e);
}
- return hasNext;
}
@Override
@@ -123,165 +94,48 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
throw new NoSuchElementException();
}
- if (!timeseriesMetadataIterator.hasNext()) {
- currentEntry = entriesIterator.next();
- timeseriesMetadataIterator = currentEntry.getValue().iterator();
- }
- currentTimeseriesMetadata = timeseriesMetadataIterator.next();
- measurementSchemas = new ArrayList<>();
-
- try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
- if (currentTimeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
- processTimeseriesMetadata(currentTimeseriesMetadata, reader);
- currentTimeseriesMetadata = timeseriesMetadataIterator.next();
- }
- return processTimeseriesMetadata(currentTimeseriesMetadata, reader);
+ try {
+ return buildNextTablet();
} catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- private Tablet createTablet(long[] timestamps, Object[] values, BitMap[]
bitMaps) {
- long[] tmp;
-
- if (isAligned) {
- if (timestampsForAligned == null) {
- timestampsForAligned = timestamps;
- return null;
- }
- tmp = timestampsForAligned;
- } else {
- tmp = timestamps;
+ throw new PipeException("Failed to build tablet", e);
}
-
- // create tablet
- int rowSize = tmp.length;
- Tablet tablet = new Tablet(currentEntry.getKey(), measurementSchemas,
rowSize);
- tablet.timestamps = tmp;
- tablet.values = values;
- tablet.rowSize = rowSize;
- tablet.bitMaps = bitMaps;
-
- return tablet;
}
- private Tablet processTimeseriesMetadata(
- TimeseriesMetadata timeseriesMetadata, TsFileSequenceReader reader) {
- int pageIndex = 0;
- if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
- isAligned = true;
- timeBatches.clear();
- } else {
- MeasurementSchema measurementSchema =
- new MeasurementSchema(
- timeseriesMetadata.getMeasurementId(),
timeseriesMetadata.getTSDataType());
- measurementSchemas.add(measurementSchema);
+ private Tablet buildNextTablet() throws IOException {
+ final List<MeasurementSchema> schemas = new ArrayList<>();
+ for (final String measurement : measurements) {
+ final TSDataType dataType =
+ measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR
+ measurement);
+ schemas.add(new MeasurementSchema(measurement, dataType));
}
+ final Tablet tablet = new Tablet(deviceId, schemas);
+ tablet.initBitMaps();
- List<Byte> bitMapBytes = new ArrayList<>();
- List<Object> measurementValues = new ArrayList<>();
- List<Long> measurementTimestamps = new ArrayList<>();
-
- for (IChunkMetadata chunkMetadata :
timeseriesMetadata.getChunkMetadataList()) {
- long offset = chunkMetadata.getOffsetOfChunkHeader();
- try {
- reader.position(offset);
- ChunkHeader header = reader.readChunkHeader(reader.readMarker());
- int dataSize = header.getDataSize();
-
- Decoder defaultTimeDecoder =
- Decoder.getDecoderByType(
-
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
- Decoder valueDecoder =
- Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
- pageIndex = 0;
- if (header.getDataType() == TSDataType.VECTOR) {
- timeBatches.clear();
- }
-
- while (dataSize > 0) {
- PageHeader pageHeader =
- reader.readPageHeader(
- header.getDataType(), (header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
- ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
+ while (queryDataSet.hasNext()) {
+ final RowRecord rowRecord = queryDataSet.next();
- // Time column chunk
- if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
- == TsFileConstant.TIME_COLUMN_MASK) {
- TimePageReader timePageReader =
- new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
- long[] timeBatch = timePageReader.getNextTimeBatch();
- timeBatches.add(timeBatch);
+ final int rowIndex = tablet.rowSize;
- for (long time : timeBatch) {
- measurementTimestamps.add(time);
- }
- }
- // Value column chunk
- else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
- == TsFileConstant.VALUE_COLUMN_MASK) {
- ValuePageReader valuePageReader =
- new ValuePageReader(pageHeader, pageData,
header.getDataType(), valueDecoder);
+ tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
- for (byte value : valuePageReader.getBitmap()) {
- bitMapBytes.add(value);
- }
-
- for (TsPrimitiveType value :
- valuePageReader.nextValueBatch(timeBatches.get(pageIndex))) {
- measurementValues.add(value.getValue());
- }
- }
-
- // NonAligned Chunk
- else {
- PageReader pageReader =
- new PageReader(
- pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder, null);
- BatchData batchData = pageReader.getAllSatisfiedPageData();
- List<Integer> isNullList = new ArrayList<>();
- int index = 0;
- while (batchData.hasCurrent()) {
- measurementTimestamps.add(batchData.currentTime());
- Object value = batchData.currentValue();
-
- if (value == null) {
- isNullList.add(index);
- }
- measurementValues.add(value);
- index++;
- batchData.next();
- }
-
- BitMap bitmap = new BitMap(measurementTimestamps.size());
- for (int isNull : isNullList) {
- bitmap.mark(isNull);
- }
- byte[] bytes = bitmap.getByteArray();
- for (byte value : bytes) {
- bitMapBytes.add(value);
- }
- }
- pageIndex++;
- dataSize -= pageHeader.getSerializedPageSize();
+ final List<Field> fields = rowRecord.getFields();
+ final int fieldSize = fields.size();
+ for (int i = 0; i < fieldSize; i++) {
+ final Field field = fields.get(i);
+ if (field == null || field.getDataType() == null) {
+ tablet.bitMaps[i].mark(rowIndex);
+ } else {
+ tablet.addValue(measurements.get(i), rowIndex,
field.getObjectValue(field.getDataType()));
}
- } catch (IOException e) {
- throw new UncheckedIOException(e);
}
- }
- long[] timestamps = new long[measurementTimestamps.size()];
- for (int i = 0; i < measurementTimestamps.size(); i++) {
- timestamps[i] = measurementTimestamps.get(i);
- }
+ tablet.rowSize++;
- byte[] byteArray = new byte[bitMapBytes.size()];
- for (int i = 0; i < bitMapBytes.size(); i++) {
- byteArray[i] = bitMapBytes.get(i);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ break;
+ }
}
- BitMap[] bitMaps = new BitMap[] {new BitMap(byteArray.length, byteArray)};
- return createTablet(timestamps, measurementValues.toArray(), bitMaps);
+ return tablet;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
index 62979b7d52c..c9774b4618a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
@@ -56,15 +56,23 @@ public class PipeDoNothingProcessor implements
PipeProcessor {
.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
eventCollector.collect(tabletInsertionEvent);
} else {
- eventCollector.collect(
- tabletInsertionEvent.processRowByRow(
+ tabletInsertionEvent
+ .processRowByRow(
(row, rowCollector) -> {
try {
rowCollector.collectRow(row);
} catch (IOException e) {
throw new PipeException("Failed to collect row", e);
}
- }));
+ })
+ .forEach(
+ event -> {
+ try {
+ eventCollector.collect(event);
+ } catch (IOException e) {
+ throw new PipeException("Failed to collect event", e);
+ }
+ });
}
} else {
eventCollector.collect(tabletInsertionEvent);
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java
similarity index 78%
rename from
server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
rename to
server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java
index 5a211fc2cff..6a68b5ad7cd 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java
@@ -24,7 +24,8 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -37,7 +38,7 @@ import org.junit.Test;
import java.util.Arrays;
-public class PipeInsertNodeTabletInsertionEventTest {
+public class PipeTabletInsertionEventTest {
InsertRowNode insertRowNode;
InsertTabletNode insertTabletNode;
@@ -56,7 +57,6 @@ public class PipeInsertNodeTabletInsertionEventTest {
};
final MeasurementSchema[] schemas = new MeasurementSchema[6];
- final Object[] values = new Object[6];
final String pattern = "root.sg.d1";
@@ -78,6 +78,14 @@ public class PipeInsertNodeTabletInsertionEventTest {
}
private void createInsertRowNode() throws IllegalPathException {
+ final Object[] values = new Object[6];
+
+ values[0] = 100;
+ values[1] = 10000L;
+ values[2] = 2F;
+ values[3] = 1.0;
+ values[4] = false;
+ values[5] = Binary.valueOf("text");
insertRowNode =
new InsertRowNode(
@@ -93,6 +101,24 @@ public class PipeInsertNodeTabletInsertionEventTest {
}
private void createInsertTabletNode() throws IllegalPathException {
+ final Object[] values = new Object[6];
+
+ values[0] = new int[5];
+ values[1] = new long[5];
+ values[2] = new float[5];
+ values[3] = new double[5];
+ values[4] = new boolean[5];
+ values[5] = new Binary[5];
+
+ for (int r = 0; r < 5; r++) {
+ ((int[]) values[0])[r] = 100;
+ ((long[]) values[1])[r] = 10000;
+ ((float[]) values[2])[r] = 2;
+ ((double[]) values[3])[r] = 1.0;
+ ((boolean[]) values[4])[r] = false;
+ ((Binary[]) values[5])[r] = Binary.valueOf("text");
+ }
+
this.insertTabletNode =
new InsertTabletNode(
new PlanNodeId("plannode 1"),
@@ -108,6 +134,7 @@ public class PipeInsertNodeTabletInsertionEventTest {
}
private void createTablet() {
+ final Object[] values = new Object[6];
// create tablet for insertRowNode
BitMap[] bitMapsForInsertRowNode = new BitMap[6];
@@ -167,12 +194,18 @@ public class PipeInsertNodeTabletInsertionEventTest {
@Test
public void convertToTabletForTest() {
- PipeInsertNodeTabletInsertionEvent event1 = new
PipeInsertNodeTabletInsertionEvent(null, null);
- Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern);
+ Tablet tablet1 = new TabletInsertionDataContainer(insertRowNode,
pattern).convertToTablet();
Assert.assertEquals(tablet1, tabletForInsertRowNode);
- PipeInsertNodeTabletInsertionEvent event2 = new
PipeInsertNodeTabletInsertionEvent(null, null);
- Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern);
+ Tablet tablet2 = new TabletInsertionDataContainer(insertTabletNode,
pattern).convertToTablet();
Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+
+ PipeRawTabletInsertionEvent event3 = new
PipeRawTabletInsertionEvent(tablet1, pattern);
+ Tablet tablet3 = event3.convertToTablet();
+ Assert.assertEquals(tablet1, tablet3);
+
+ PipeRawTabletInsertionEvent event4 = new
PipeRawTabletInsertionEvent(tablet2, pattern);
+ Tablet tablet4 = event4.convertToTablet();
+ Assert.assertEquals(tablet2, tablet4);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java
new file mode 100644
index 00000000000..d74f790fd15
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TsFileInsertionDataContainer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.fail;
+
+public class TsFileInsertionDataContainerTest {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class);
+
+ private File alignedTsFile;
+ private File nonalignedTsFile;
+
+ @After
+ public void tearDown() throws Exception {
+ if (alignedTsFile != null) {
+ alignedTsFile.delete();
+ }
+ if (nonalignedTsFile != null) {
+ nonalignedTsFile.delete();
+ }
+ }
+
+ @Test
+ public void testToTabletInsertionEvents() throws Exception {
+ Set<Integer> deviceNumbers = new HashSet<>();
+ deviceNumbers.add(1);
+ deviceNumbers.add(2);
+
+ Set<Integer> measurementNumbers = new HashSet<>();
+ measurementNumbers.add(1);
+ measurementNumbers.add(2);
+
+ for (int deviceNumber : deviceNumbers) {
+ for (int measurementNumber : measurementNumbers) {
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 0);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 2);
+
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 999);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001);
+
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 +
1);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2
- 1);
+
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025);
+
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2
+ 1);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2);
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2
- 1);
+
+ testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001);
+ }
+ }
+ }
+
+ private void testToTabletInsertionEvents(
+ int deviceNumber, int measurementNumber, int rowNumberInOneDevice)
throws Exception {
+ LOGGER.info(
+ "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber =
{}, rowNumberInOneDevice = {}",
+ deviceNumber,
+ measurementNumber,
+ rowNumberInOneDevice);
+
+ alignedTsFile =
+ TsFileGeneratorUtils.generateAlignedTsFile(
+ "aligned.tsfile",
+ deviceNumber,
+ measurementNumber,
+ rowNumberInOneDevice,
+ 300,
+ 10000,
+ 700,
+ 50);
+ nonalignedTsFile =
+ TsFileGeneratorUtils.generateNonAlignedTsFile(
+ "nonaligned.tsfile",
+ deviceNumber,
+ measurementNumber,
+ rowNumberInOneDevice,
+ 300,
+ 10000,
+ 700,
+ 50);
+
+ try (final TsFileInsertionDataContainer alignedContainer =
+ new TsFileInsertionDataContainer(alignedTsFile, "root");
+ final TsFileInsertionDataContainer nonalignedContainer =
+ new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) {
+ AtomicInteger count1 = new AtomicInteger(0);
+ AtomicInteger count2 = new AtomicInteger(0);
+ AtomicInteger count3 = new AtomicInteger(0);
+
+ alignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(measurementNumber,
row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+
Assert.assertEquals(measurementNumber, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+ tabletInsertionEvent2.processTablet(
+ (tablet, rowCollector) -> {
+ new
PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(
+
measurementNumber, row.size());
+
count3.incrementAndGet();
+ } catch (IOException
e) {
+ throw new
RuntimeException(e);
+ }
+ });
+ }))));
+
+ Assert.assertEquals(count1.getAndSet(0), deviceNumber *
rowNumberInOneDevice);
+ Assert.assertEquals(count2.getAndSet(0), deviceNumber *
rowNumberInOneDevice);
+ Assert.assertEquals(count3.getAndSet(0), deviceNumber *
rowNumberInOneDevice);
+
+ nonalignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processTablet(
+ (tablet, rowCollector) -> {
+ new PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+ Assert.assertEquals(measurementNumber,
row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+
Assert.assertEquals(measurementNumber, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+
tabletInsertionEvent2.processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(
+ measurementNumber,
row.size());
+ count3.incrementAndGet();
+ } catch (IOException e) {
+ throw new
RuntimeException(e);
+ }
+ }))));
+
+ Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice);
+ Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice);
+ Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ AtomicReference<String> oneDeviceInAlignedTsFile = new AtomicReference<>();
+ AtomicReference<String> oneMeasurementInAlignedTsFile = new
AtomicReference<>();
+
+ AtomicReference<String> oneDeviceInUnalignedTsFile = new
AtomicReference<>();
+ AtomicReference<String> oneMeasurementInUnalignedTsFile = new
AtomicReference<>();
+
+ try (TsFileSequenceReader alignedReader =
+ new TsFileSequenceReader(alignedTsFile.getAbsolutePath());
+ TsFileSequenceReader nonalignedReader =
+ new TsFileSequenceReader(nonalignedTsFile.getAbsolutePath())) {
+
+ alignedReader
+ .getDeviceMeasurementsMap()
+ .forEach(
+ (k, v) ->
+ v.stream()
+ .filter(p -> p != null && !p.isEmpty())
+ .forEach(
+ p -> {
+ oneDeviceInAlignedTsFile.set(k);
+ oneMeasurementInAlignedTsFile.set(new Path(k, p,
false).toString());
+ }));
+ nonalignedReader
+ .getDeviceMeasurementsMap()
+ .forEach(
+ (k, v) ->
+ v.stream()
+ .filter(p -> p != null && !p.isEmpty())
+ .forEach(
+ p -> {
+ oneDeviceInUnalignedTsFile.set(k);
+ oneMeasurementInUnalignedTsFile.set(new Path(k, p,
false).toString());
+ }));
+ }
+
+ try (final TsFileInsertionDataContainer alignedContainer =
+ new TsFileInsertionDataContainer(alignedTsFile,
oneDeviceInAlignedTsFile.get());
+ final TsFileInsertionDataContainer nonalignedContainer =
+ new TsFileInsertionDataContainer(
+ nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) {
+ AtomicInteger count1 = new AtomicInteger(0);
+ AtomicInteger count2 = new AtomicInteger(0);
+ AtomicInteger count3 = new AtomicInteger(0);
+
+ alignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(measurementNumber,
row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+
Assert.assertEquals(measurementNumber, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+ tabletInsertionEvent2.processTablet(
+ (tablet, rowCollector) -> {
+ new
PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(
+
measurementNumber, row.size());
+
count3.incrementAndGet();
+ } catch (IOException
e) {
+ throw new
RuntimeException(e);
+ }
+ });
+ }))));
+
+ Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
+ Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
+ Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+
+ nonalignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processTablet(
+ (tablet, rowCollector) -> {
+ new PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+ Assert.assertEquals(measurementNumber,
row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+
Assert.assertEquals(measurementNumber, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+
tabletInsertionEvent2.processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(
+ measurementNumber,
row.size());
+ count3.incrementAndGet();
+ } catch (IOException e) {
+ throw new
RuntimeException(e);
+ }
+ }))));
+
+ Assert.assertEquals(count1.get(), rowNumberInOneDevice);
+ Assert.assertEquals(count2.get(), rowNumberInOneDevice);
+ Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final TsFileInsertionDataContainer alignedContainer =
+ new TsFileInsertionDataContainer(alignedTsFile,
oneMeasurementInAlignedTsFile.get());
+ final TsFileInsertionDataContainer nonalignedContainer =
+ new TsFileInsertionDataContainer(
+ nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) {
+ AtomicInteger count1 = new AtomicInteger(0);
+ AtomicInteger count2 = new AtomicInteger(0);
+ AtomicInteger count3 = new AtomicInteger(0);
+
+ alignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(1, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(1, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+ tabletInsertionEvent2.processTablet(
+ (tablet, rowCollector) -> {
+ new
PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(1, row.size());
+
count3.incrementAndGet();
+ } catch (IOException
e) {
+ throw new
RuntimeException(e);
+ }
+ });
+ }))));
+
+ Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
+ Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
+ Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+
+ nonalignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processTablet(
+ (tablet, rowCollector) -> {
+ new PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+ Assert.assertEquals(1, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(1, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+
tabletInsertionEvent2.processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(1,
row.size());
+ count3.incrementAndGet();
+ } catch (IOException e) {
+ throw new
RuntimeException(e);
+ }
+ }))));
+
+ Assert.assertEquals(count1.get(), rowNumberInOneDevice);
+ Assert.assertEquals(count2.get(), rowNumberInOneDevice);
+ Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final TsFileInsertionDataContainer alignedContainer =
+ new TsFileInsertionDataContainer(alignedTsFile,
"not-exist-pattern");
+ final TsFileInsertionDataContainer nonalignedContainer =
+ new TsFileInsertionDataContainer(nonalignedTsFile,
"not-exist-pattern"); ) {
+ AtomicInteger count1 = new AtomicInteger(0);
+ AtomicInteger count2 = new AtomicInteger(0);
+ AtomicInteger count3 = new AtomicInteger(0);
+
+ alignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(0, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(0, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+ tabletInsertionEvent2.processTablet(
+ (tablet, rowCollector) -> {
+ new
PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(0, row.size());
+
count3.incrementAndGet();
+ } catch (IOException
e) {
+ throw new
RuntimeException(e);
+ }
+ });
+ }))));
+
+ Assert.assertEquals(count1.getAndSet(0), 0);
+ Assert.assertEquals(count2.getAndSet(0), 0);
+ Assert.assertEquals(count3.getAndSet(0), 0);
+
+ nonalignedContainer
+ .toTabletInsertionEvents()
+ .forEach(
+ event ->
+ event
+ .processTablet(
+ (tablet, rowCollector) -> {
+ new PipeRawTabletInsertionEvent(tablet)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+ Assert.assertEquals(0, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ })
+ .forEach(
+ tabletInsertionEvent1 ->
+ tabletInsertionEvent1
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(0, row.size());
+ count2.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(
+ tabletInsertionEvent2 ->
+
tabletInsertionEvent2.processRowByRow(
+ (row, collector) -> {
+ try {
+ collector.collectRow(row);
+ Assert.assertEquals(0,
row.size());
+ count3.incrementAndGet();
+ } catch (IOException e) {
+ throw new
RuntimeException(e);
+ }
+ }))));
+
+ Assert.assertEquals(count1.get(), 0);
+ Assert.assertEquals(count2.get(), 0);
+ Assert.assertEquals(count3.get(), 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 6c29e7c4c44..8e760edaef4 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -2049,6 +2049,24 @@ public class TsFileSequenceReader implements
AutoCloseable {
return result;
}
+ /**
+ * get all types of measurements in this file
+ *
+ * @return full path -> datatype
+ */
+ public Map<String, TSDataType> getFullPathDataTypeMap() throws IOException {
+ final Map<String, TSDataType> result = new HashMap<>();
+ for (final String device : getAllDevices()) {
+ Map<String, TimeseriesMetadata> timeseriesMetadataMap =
readDeviceMetadata(device);
+ for (TimeseriesMetadata timeseriesMetadata :
timeseriesMetadataMap.values()) {
+ result.put(
+ device + TsFileConstant.PATH_SEPARATOR +
timeseriesMetadata.getMeasurementId(),
+ timeseriesMetadata.getTSDataType());
+ }
+ }
+ return result;
+ }
+
public Map<String, List<String>> getDeviceMeasurementsMap() throws
IOException {
Map<String, List<String>> result = new HashMap<>();
for (String device : getAllDevices()) {