This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 44d4f6d962e Optimized write performace by reducing separators (#17670)
44d4f6d962e is described below
commit 44d4f6d962ea4e0a03e07e8e3051b6e0c6e0e9b7
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 1 18:16:39 2026 +0800
Optimized write performace by reducing separators (#17670)
---
.../dataregion/DataExecutionVisitor.java | 5 -
.../planner/plan/node/write/InsertRowNode.java | 6 +-
.../planner/plan/node/write/InsertRowsNode.java | 6 +-
.../planner/plan/node/write/InsertTabletNode.java | 6 +-
.../plan/node/write/RelationalInsertRowNode.java | 4 +-
.../plan/node/write/RelationalInsertRowsNode.java | 4 +-
.../plan/planner/plan/node/write/SearchNode.java | 45 +++++++
.../db/storageengine/dataregion/DataRegion.java | 82 ++++++++++--
.../storageengine/dataregion/wal/node/WALNode.java | 11 +-
.../planner/node/write/InsertRowNodeSerdeTest.java | 90 +++++++++++++
.../node/write/InsertRowsNodeSerdeTest.java | 141 +++++++++++++++++++++
.../node/write/InsertTabletNodeSerdeTest.java | 67 ++++++++++
.../wal/node/WALNodeWaitForRollFileTest.java | 29 ++++-
13 files changed, 463 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 88d23360b54..e96ae983e58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -76,7 +76,6 @@ public class DataExecutionVisitor implements
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
- dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (OutOfTTLException e) {
LOGGER.warn(DataNodeMiscMessages.ERROR_EXECUTING_PLAN_NODE_CAUSED, node,
e.getMessage());
@@ -100,7 +99,6 @@ public class DataExecutionVisitor implements
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertTablet(final InsertTabletNode node, final
DataRegion dataRegion) {
try {
dataRegion.insertTablet(node);
- dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (final OutOfTTLException e) {
LOGGER.debug(DataNodeMiscMessages.ERROR_EXECUTING_PLAN_NODE_CAUSED,
node, e.getMessage());
@@ -137,7 +135,6 @@ public class DataExecutionVisitor implements
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
- dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn(DataNodeMiscMessages.REJECT_EXECUTING_PLAN_NODE, node,
e.getMessage());
@@ -174,7 +171,6 @@ public class DataExecutionVisitor implements
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node,
DataRegion dataRegion) {
try {
dataRegion.insertTablets(node);
- dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn(DataNodeMiscMessages.REJECT_EXECUTING_PLAN_NODE, node,
e.getMessage());
@@ -209,7 +205,6 @@ public class DataExecutionVisitor implements
PlanVisitor<TSStatus, DataRegion> {
InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
- dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn(DataNodeMiscMessages.REJECT_EXECUTING_PLAN_NODE, node,
e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 448c8df4545..708dac1d33a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -621,7 +621,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort(getType().getNodeType());
- buffer.putLong(searchIndex);
+ buffer.putLong(getEncodedSearchIndex());
subSerialize(buffer);
}
@@ -702,7 +702,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
public static InsertRowNode deserializeFromWAL(DataInputStream stream)
throws IOException {
long searchIndex = stream.readLong();
InsertRowNode insertNode = subDeserializeFromWAL(stream);
- insertNode.setSearchIndex(searchIndex);
+ insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}
@@ -793,7 +793,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) {
long searchIndex = buffer.getLong();
InsertRowNode insertNode = subDeserializeFromWAL(buffer);
- insertNode.setSearchIndex(searchIndex);
+ insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 5571cd25616..957b22cfcb2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -350,7 +350,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort(getType().getNodeType());
- buffer.putLong(searchIndex);
+ buffer.putLong(getEncodedSearchIndex());
subSerialize(buffer);
}
@@ -378,7 +378,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
InsertRowNode insertRowNode =
InsertRowNode.subDeserializeFromWAL(stream);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
- insertRowsNode.setSearchIndex(searchIndex);
+ insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}
@@ -398,7 +398,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
InsertRowNode insertRowNode =
InsertRowNode.subDeserializeFromWAL(buffer);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
- insertRowsNode.setSearchIndex(searchIndex);
+ insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index adf14a39381..a00b418da9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -905,7 +905,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList) {
- buffer.putLong(searchIndex);
+ buffer.putLong(getEncodedSearchIndex());
WALWriteUtils.write(targetPath.getFullPath(), buffer);
// data types are serialized in measurement schemas
writeMeasurementSchemas(buffer);
@@ -1037,7 +1037,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
protected void subDeserializeFromWAL(DataInputStream stream) throws
IOException {
- searchIndex = stream.readLong();
+ setSearchIndexFromWAL(stream.readLong());
try {
targetPath = readTargetPath(stream);
} catch (IllegalPathException e) {
@@ -1073,7 +1073,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
protected void subDeserializeFromWAL(ByteBuffer buffer) {
- searchIndex = buffer.getLong();
+ setSearchIndexFromWAL(buffer.getLong());
try {
targetPath = readTargetPath(buffer);
} catch (IllegalPathException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index e622dba30b9..8ef6802f047 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -128,14 +128,14 @@ public class RelationalInsertRowNode extends
InsertRowNode {
throws IOException {
long searchIndex = stream.readLong();
RelationalInsertRowNode insertNode = subDeserializeFromWAL(stream);
- insertNode.setSearchIndex(searchIndex);
+ insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}
public static RelationalInsertRowNode deserializeFromWAL(ByteBuffer buffer) {
long searchIndex = buffer.getLong();
RelationalInsertRowNode insertNode = subDeserializeFromWAL(buffer);
- insertNode.setSearchIndex(searchIndex);
+ insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83498ceefc9..741c45f3256 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -124,7 +124,7 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
RelationalInsertRowNode insertRowNode =
RelationalInsertRowNode.subDeserializeFromWAL(stream);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
- insertRowsNode.setSearchIndex(searchIndex);
+ insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}
@@ -144,7 +144,7 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
RelationalInsertRowNode insertRowNode =
RelationalInsertRowNode.subDeserializeFromWAL(buffer);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
- insertRowsNode.setSearchIndex(searchIndex);
+ insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index 5a7e5d9cf78..975ac1f4844 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -28,15 +28,22 @@ import java.util.List;
public abstract class SearchNode extends WritePlanNode implements
ComparableConsensusRequest {
+ private static final long LAST_FRAGMENT_MASK = Long.MIN_VALUE;
+
/** this insert node doesn't need to participate in iot consensus */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+ // Preserve last-fragment state for WAL entries that do not have a consensus
search index.
+ private static final long NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT =
Long.MIN_VALUE;
+
/**
* this index is used by wal search, its order should be protected by the
upper layer, and the
* value should start from 1
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
+ protected boolean isLastFragment = false;
+
protected SearchNode(PlanNodeId id) {
super(id);
}
@@ -51,5 +58,43 @@ public abstract class SearchNode extends WritePlanNode
implements ComparableCons
return this;
}
+ public boolean isLastFragment() {
+ return isLastFragment;
+ }
+
+ public SearchNode setLastFragment(boolean lastFragment) {
+ isLastFragment = lastFragment;
+ return this;
+ }
+
+ protected long getEncodedSearchIndex() {
+ if (!isLastFragment) {
+ return searchIndex;
+ }
+ if (searchIndex == NO_CONSENSUS_INDEX) {
+ return NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT;
+ }
+ return searchIndex | LAST_FRAGMENT_MASK;
+ }
+
+ public static long extractSearchIndex(long encodedSearchIndex) {
+ if (encodedSearchIndex == NO_CONSENSUS_INDEX
+ || encodedSearchIndex == NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT) {
+ return NO_CONSENSUS_INDEX;
+ }
+ return encodedSearchIndex & ~LAST_FRAGMENT_MASK;
+ }
+
+ public static boolean isLastFragment(long encodedSearchIndex) {
+ return encodedSearchIndex == NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT
+ || (encodedSearchIndex != NO_CONSENSUS_INDEX
+ && (encodedSearchIndex & LAST_FRAGMENT_MASK) != 0);
+ }
+
+ protected void setSearchIndexFromWAL(long encodedSearchIndex) {
+ this.searchIndex = extractSearchIndex(encodedSearchIndex);
+ this.isLastFragment = isLastFragment(encodedSearchIndex);
+ }
+
public abstract SearchNode merge(List<SearchNode> searchNodes);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 784d4fa4771..93a04bf401a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1205,6 +1205,7 @@ public class DataRegion implements IDataRegionForQuery {
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
initFlushTimeMap(timePartitionId);
+ insertRowNode.setLastFragment(true);
boolean isSequence =
config.isEnableSeparateData()
@@ -1333,14 +1334,29 @@ public class DataRegion implements IDataRegionForQuery {
InsertTabletNode insertTabletNode,
Map<Long, List<int[]>[]> splitMap,
TSStatus[] results,
- long[] infoForMetrics)
+ long[] infoForMetrics,
+ boolean markLastFragmentOnFinalWrite)
throws DataTypeInconsistentException {
boolean noFailure = true;
+ int remainingFragmentCount = 0;
+ if (markLastFragmentOnFinalWrite) {
+ for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
+ List<int[]>[] rangeLists = entry.getValue();
+ if (rangeLists[1] != null) {
+ remainingFragmentCount++;
+ }
+ if (rangeLists[0] != null) {
+ remainingFragmentCount++;
+ }
+ }
+ }
for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
long timePartitionId = entry.getKey();
List<int[]>[] rangeLists = entry.getValue();
List<int[]> sequenceRangeList = rangeLists[1];
if (sequenceRangeList != null) {
+ insertTabletNode.setLastFragment(
+ markLastFragmentOnFinalWrite && remainingFragmentCount == 1);
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
@@ -1351,9 +1367,12 @@ public class DataRegion implements IDataRegionForQuery {
noFailure,
infoForMetrics)
&& noFailure;
+ remainingFragmentCount--;
}
List<int[]> unSequenceRangeList = rangeLists[0];
if (unSequenceRangeList != null) {
+ insertTabletNode.setLastFragment(
+ markLastFragmentOnFinalWrite && remainingFragmentCount == 1);
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
@@ -1364,6 +1383,7 @@ public class DataRegion implements IDataRegionForQuery {
noFailure,
infoForMetrics)
&& noFailure;
+ remainingFragmentCount--;
}
}
return noFailure;
@@ -1402,7 +1422,7 @@ public class DataRegion implements IDataRegionForQuery {
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
- boolean noFailure = executeInsertTablet(insertTabletNode, results,
infoForMetrics);
+ boolean noFailure = executeInsertTablet(insertTabletNode, results,
infoForMetrics, true);
updateTsFileProcessorMetric(insertTabletNode, infoForMetrics);
if (!noFailure) {
@@ -1418,7 +1438,8 @@ public class DataRegion implements IDataRegionForQuery {
InsertTabletNode insertTabletNode,
TSStatus[] results,
long[] infoForMetrics,
- List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs) {
+ List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs,
+ boolean markLastFragmentOnFinalWrite) {
final int initialStart = start;
try {
Map<Long, List<int[]>[]> splitInfo = new HashMap<>();
@@ -1427,7 +1448,8 @@ public class DataRegion implements IDataRegionForQuery {
split(insertTabletNode, start, end, splitInfo);
start = end;
}
- return doInsert(insertTabletNode, splitInfo, results, infoForMetrics);
+ return doInsert(
+ insertTabletNode, splitInfo, results, infoForMetrics,
markLastFragmentOnFinalWrite);
} catch (DataTypeInconsistentException e) {
// the exception will trigger a flush, which requires the flush time to
be recalculated
start = initialStart;
@@ -1438,7 +1460,8 @@ public class DataRegion implements IDataRegionForQuery {
start = end;
}
try {
- return doInsert(insertTabletNode, splitInfo, results, infoForMetrics);
+ return doInsert(
+ insertTabletNode, splitInfo, results, infoForMetrics,
markLastFragmentOnFinalWrite);
} catch (DataTypeInconsistentException ex) {
logger.error(StorageEngineMessages.DATA_INCONSISTENT_NOT_TRIGGER_TWICE, ex);
return false;
@@ -1447,7 +1470,10 @@ public class DataRegion implements IDataRegionForQuery {
}
private boolean executeInsertTablet(
- InsertTabletNode insertTabletNode, TSStatus[] results, long[]
infoForMetrics)
+ InsertTabletNode insertTabletNode,
+ TSStatus[] results,
+ long[] infoForMetrics,
+ boolean markLastFragmentOnFinalWrite)
throws OutOfTTLException {
boolean noFailure;
int loc =
@@ -1458,7 +1484,13 @@ public class DataRegion implements IDataRegionForQuery {
List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
noFailure =
- splitAndInsert(loc, insertTabletNode, results, infoForMetrics,
deviceEndOffsetPairs)
+ splitAndInsert(
+ loc,
+ insertTabletNode,
+ results,
+ infoForMetrics,
+ deviceEndOffsetPairs,
+ markLastFragmentOnFinalWrite)
&& noFailure;
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
@@ -1471,6 +1503,25 @@ public class DataRegion implements IDataRegionForQuery {
return noFailure;
}
+ private int findLastInsertTabletIndexToMark(final InsertMultiTabletsNode
insertMultiTabletsNode) {
+ for (int i = insertMultiTabletsNode.getInsertTabletNodeList().size() - 1;
i >= 0; i--) {
+ final InsertTabletNode insertTabletNode =
+ insertMultiTabletsNode.getInsertTabletNodeList().get(i);
+ if (insertTabletNode.getRowCount() <= 0 ||
insertTabletNode.allMeasurementFailed()) {
+ continue;
+ }
+ if (!insertTabletNode.shouldCheckTTL()) {
+ return i;
+ }
+ final long[] times = insertTabletNode.getTimes();
+ if (times.length > 0
+ && CommonUtils.isAlive(times[times.length - 1],
getTTL(insertTabletNode))) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
private void initFlushTimeMap(long timePartitionId) {
if (config.isEnableSeparateData()
&&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
@@ -1752,8 +1803,10 @@ public class DataRegion implements IDataRegionForQuery {
}
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
+ int remainingFragments = tsFileProcessorMap.size();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
InsertRowsNode subInsertRowsNode = entry.getValue();
+ subInsertRowsNode.setLastFragment(--remainingFragments == 0);
try {
List<TsFileProcessor> insertedProcessors =
insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, infoForMetrics);
@@ -1865,10 +1918,14 @@ public class DataRegion implements IDataRegionForQuery {
}
final List<TsFileProcessor> insertedProcessors = new
ArrayList<>(retriedProcessorMap.size());
+ int remainingRetriedFragments = retriedProcessorMap.size();
for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry :
retriedProcessorMap.entrySet()) {
final TsFileProcessor retriedProcessor = retriedEntry.getKey();
- registerToTsFile(retriedEntry.getValue(), retriedProcessor);
- retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
+ final InsertRowsNode retriedInsertRowsNode = retriedEntry.getValue();
+ retriedInsertRowsNode.setLastFragment(
+ subInsertRowsNode.isLastFragment() && --remainingRetriedFragments ==
0);
+ registerToTsFile(retriedInsertRowsNode, retriedProcessor);
+ retriedProcessor.insertRows(retriedInsertRowsNode, infoForMetrics);
insertedProcessors.add(retriedProcessor);
}
return insertedProcessors;
@@ -4619,8 +4676,10 @@ public class DataRegion implements IDataRegionForQuery {
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
+ int remainingFragments = tsFileProcessorMap.size();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
InsertRowsNode subInsertRowsNode = entry.getValue();
+ subInsertRowsNode.setLastFragment(--remainingFragments == 0);
try {
List<TsFileProcessor> insertedProcessors =
insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, infoForMetrics);
@@ -4761,6 +4820,7 @@ public class DataRegion implements IDataRegionForQuery {
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumbe
+ final int lastTabletIndexToMark =
findLastInsertTabletIndexToMark(insertMultiTabletsNode);
for (int i = 0; i <
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
InsertTabletNode insertTabletNode =
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
long[] times = insertTabletNode.getTimes();
@@ -4774,7 +4834,9 @@ public class DataRegion implements IDataRegionForQuery {
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = false;
try {
- noFailure = executeInsertTablet(insertTabletNode, results,
infoForMetrics);
+ noFailure =
+ executeInsertTablet(
+ insertTabletNode, results, infoForMetrics, i ==
lastTabletIndexToMark);
} catch (WriteProcessException e) {
insertMultiTabletsNode
.getResults()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 4057914f9db..86e2b173027 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -690,7 +691,9 @@ public class WALNode implements IWALNode {
if (type.needSearch()) {
// see WALInfoEntry#serialize, entry type + memtable id + plan
node type
buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
- final long currentWalEntryIndex = buffer.getLong();
+ final long encodedSearchIndex = buffer.getLong();
+ final long currentWalEntryIndex =
SearchNode.extractSearchIndex(encodedSearchIndex);
+ final boolean isLastFragment =
SearchNode.isLastFragment(encodedSearchIndex);
buffer.clear();
if (currentWalEntryIndex == -1) {
// WAL entry of targetIndex has been fully collected, so put
them into insertNodes
@@ -715,6 +718,9 @@ public class WALNode implements IWALNode {
tmpNodes.get().add(new IoTConsensusRequest(buffer));
memorySize += buffer.remaining();
}
+ if (isLastFragment) {
+ tryToCollectInsertNodeAndBumpIndex.run();
+ }
} else {
// currentWalEntryIndex > targetIndex
// WAL entry of targetIndex has been fully collected, put them
into insertNodes
@@ -743,6 +749,9 @@ public class WALNode implements IWALNode {
tmpNodes.get().add(new IoTConsensusRequest(buffer));
memorySize += buffer.remaining();
}
+ if (isLastFragment) {
+ tryToCollectInsertNodeAndBumpIndex.run();
+ }
}
} else {
tryToCollectInsertNodeAndBumpIndex.run();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
index dbe69da2462..21bc9e0d120 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
@@ -23,10 +23,13 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
@@ -35,6 +38,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
public class InsertRowNodeSerdeTest {
@@ -72,6 +76,7 @@ public class InsertRowNodeSerdeTest {
@Test
public void TestSerializeAndDeserializeForWAL() throws IllegalPathException,
IOException {
InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+ insertRowNode.setLastFragment(true);
int serializedSize = insertRowNode.serializedSize();
@@ -96,6 +101,67 @@ public class InsertRowNodeSerdeTest {
new MeasurementSchema("s5", TSDataType.BOOLEAN)
});
Assert.assertEquals(insertRowNode, tmpNode);
+ Assert.assertTrue(tmpNode.isLastFragment());
+ }
+
+ @Test
+ public void testDeserializeLegacyWAL() throws IllegalPathException,
IOException {
+ InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+ insertRowNode.setSearchIndex(123L);
+
+ byte[] bytes = new byte[insertRowNode.serializedSize()];
+ WALByteBufferForTest walBuffer = new
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+ insertRowNode.serializeToWAL(walBuffer);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(),
byteBuffer.getShort());
+ Assert.assertEquals(123L, byteBuffer.getLong());
+
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(bytes));
+ dataInputStream.readShort();
+
+ InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream);
+ tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
+ tmpNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ });
+ Assert.assertEquals(insertRowNode, tmpNode);
+ Assert.assertEquals(123L, tmpNode.getSearchIndex());
+ Assert.assertFalse(tmpNode.isLastFragment());
+ }
+
+ @Test
+ public void testDeserializeLegacyWALRelational() throws IOException {
+ RelationalInsertRowNode insertRowNode =
getRelationalInsertRowNodeWithMeasurementSchemas();
+ insertRowNode.setSearchIndex(123L);
+
+ byte[] bytes = new byte[insertRowNode.serializedSize()];
+ WALByteBufferForTest walBuffer = new
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+ insertRowNode.serializeToWAL(walBuffer);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROW.getNodeType(),
byteBuffer.getShort());
+ Assert.assertEquals(123L, byteBuffer.getLong());
+
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(bytes));
+ dataInputStream.readShort();
+
+ RelationalInsertRowNode tmpNode =
RelationalInsertRowNode.deserializeFromWAL(dataInputStream);
+ tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
+ tmpNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("id", TSDataType.STRING),
+ new MeasurementSchema("attr", TSDataType.TEXT),
+ new MeasurementSchema("value", TSDataType.INT64)
+ });
+ Assert.assertEquals(insertRowNode, tmpNode);
+ Assert.assertEquals(123L, tmpNode.getSearchIndex());
+ Assert.assertFalse(tmpNode.isLastFragment());
}
private InsertRowNode getInsertRowNode() throws IllegalPathException {
@@ -199,4 +265,28 @@ public class InsertRowNodeSerdeTest {
insertRowNode.setNeedInferType(true);
return insertRowNode;
}
+
+ private RelationalInsertRowNode
getRelationalInsertRowNodeWithMeasurementSchemas() {
+ return new RelationalInsertRowNode(
+ new PlanNodeId("plannode 3"),
+ new PartialPath("table1", false),
+ false,
+ new String[] {"id", "attr", "value"},
+ new TSDataType[] {TSDataType.STRING, TSDataType.TEXT,
TSDataType.INT64},
+ new MeasurementSchema[] {
+ new MeasurementSchema("id", TSDataType.STRING),
+ new MeasurementSchema("attr", TSDataType.TEXT),
+ new MeasurementSchema("value", TSDataType.INT64)
+ },
+ 90L,
+ new Object[] {
+ new Binary("d1".getBytes(StandardCharsets.UTF_8)),
+ new Binary("v1".getBytes(StandardCharsets.UTF_8)),
+ 1L
+ },
+ false,
+ new TsTableColumnCategory[] {
+ TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE,
TsTableColumnCategory.FIELD
+ });
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
index 08819830557..034d4cb57de 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
@@ -129,6 +129,7 @@ public class InsertRowsNodeSerdeTest {
new Object[] {2.0, false},
false),
1);
+ insertRowsNode.setLastFragment(true);
int serializedSize = insertRowsNode.serializedSize();
@@ -145,6 +146,70 @@ public class InsertRowsNodeSerdeTest {
InsertRowsNode tmpNode =
InsertRowsNode.deserializeFromWAL(dataInputStream);
tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
Assert.assertEquals(insertRowsNode, tmpNode);
+ Assert.assertTrue(tmpNode.isLastFragment());
+ }
+
+ @Test
+ public void testDeserializeLegacyWAL() throws IllegalPathException,
IOException {
+ InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("plan
node 1"));
+ insertRowsNode.addOneInsertRowNode(
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.TEXT,
+ TSDataType.STRING
+ },
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.TEXT),
+ new MeasurementSchema("s5", TSDataType.STRING)
+ },
+ 1000L,
+ new Object[] {1.0, 2f, 300L, new
Binary("444".getBytes(StandardCharsets.UTF_8)), null},
+ false),
+ 0);
+
+ insertRowsNode.addOneInsertRowNode(
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath("root.sg.d2"),
+ false,
+ new String[] {"s1", "s4"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s4", TSDataType.BOOLEAN),
+ },
+ 2000L,
+ new Object[] {2.0, false},
+ false),
+ 1);
+ insertRowsNode.setSearchIndex(123L);
+
+ byte[] bytes = new byte[insertRowsNode.serializedSize()];
+ WALByteBufferForTest walBuffer = new
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+ insertRowsNode.serializeToWAL(walBuffer);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ Assert.assertEquals(PlanNodeType.INSERT_ROWS.getNodeType(),
byteBuffer.getShort());
+ Assert.assertEquals(123L, byteBuffer.getLong());
+
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(bytes));
+ dataInputStream.readShort();
+
+ InsertRowsNode tmpNode =
InsertRowsNode.deserializeFromWAL(dataInputStream);
+ tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
+ Assert.assertEquals(insertRowsNode, tmpNode);
+ Assert.assertEquals(123L, tmpNode.getSearchIndex());
+ Assert.assertFalse(tmpNode.isLastFragment());
}
@Test
@@ -262,6 +327,7 @@ public class InsertRowsNodeSerdeTest {
TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE
}),
1);
+ insertRowsNode.setLastFragment(true);
int serializedSize = insertRowsNode.serializedSize();
@@ -280,6 +346,81 @@ public class InsertRowsNodeSerdeTest {
RelationalInsertRowsNode.deserializeFromWAL(dataInputStream);
tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
Assert.assertEquals(insertRowsNode, tmpNode);
+ Assert.assertTrue(tmpNode.isLastFragment());
}
}
+
+ @Test
+ public void testDeserializeLegacyWALRelational() throws IOException {
+ RelationalInsertRowsNode insertRowsNode =
+ new RelationalInsertRowsNode(new PlanNodeId("plan node 1"));
+ insertRowsNode.addOneInsertRowNode(
+ new RelationalInsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath("table1", false),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.TEXT,
+ TSDataType.STRING
+ },
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.TEXT),
+ new MeasurementSchema("s5", TSDataType.STRING)
+ },
+ 1000L,
+ new Object[] {1.0, 2f, 300L, new
Binary("444".getBytes(StandardCharsets.UTF_8)), null},
+ false,
+ new TsTableColumnCategory[] {
+ TsTableColumnCategory.TAG,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD,
+ TsTableColumnCategory.FIELD
+ }),
+ 0);
+
+ insertRowsNode.addOneInsertRowNode(
+ new RelationalInsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath("table1", false),
+ false,
+ new String[] {"s1", "s4"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s4", TSDataType.BOOLEAN),
+ },
+ 2000L,
+ new Object[] {2.0, false},
+ false,
+ new TsTableColumnCategory[] {
+ TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE
+ }),
+ 1);
+ insertRowsNode.setSearchIndex(123L);
+
+ byte[] bytes = new byte[insertRowsNode.serializedSize()];
+ WALByteBufferForTest walBuffer = new
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+ insertRowsNode.serializeToWAL(walBuffer);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROWS.getNodeType(),
byteBuffer.getShort());
+ Assert.assertEquals(123L, byteBuffer.getLong());
+
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(bytes));
+ dataInputStream.readShort();
+
+ RelationalInsertRowsNode tmpNode =
RelationalInsertRowsNode.deserializeFromWAL(dataInputStream);
+ tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
+ Assert.assertEquals(insertRowsNode, tmpNode);
+ Assert.assertEquals(123L, tmpNode.getSearchIndex());
+ Assert.assertFalse(tmpNode.isLastFragment());
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
index 4d8f5f6e4a8..ddc35e1eda9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
@@ -68,6 +68,7 @@ public class InsertTabletNodeSerdeTest {
@Test
public void testSerializeAndDeserializeForWAL() throws IllegalPathException,
IOException {
InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+ insertTabletNode.setLastFragment(true);
int serializedSize = insertTabletNode.serializedSize();
@@ -93,6 +94,38 @@ public class InsertTabletNodeSerdeTest {
new MeasurementSchema("s5", TSDataType.BOOLEAN)
});
Assert.assertEquals(insertTabletNode, tmpNode);
+ Assert.assertTrue(tmpNode.isLastFragment());
+ }
+
+ @Test
+ public void testDeserializeLegacyWAL() throws IllegalPathException,
IOException {
+ InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+ insertTabletNode.setSearchIndex(123L);
+
+ byte[] bytes = new byte[insertTabletNode.serializedSize()];
+ WALByteBufferForTest walBuffer = new
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+ insertTabletNode.serializeToWAL(walBuffer);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(),
byteBuffer.getShort());
+ Assert.assertEquals(123L, byteBuffer.getLong());
+
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(bytes));
+ dataInputStream.readShort();
+
+ InsertTabletNode tmpNode =
InsertTabletNode.deserializeFromWAL(dataInputStream);
+ tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
+ tmpNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ });
+ Assert.assertEquals(insertTabletNode, tmpNode);
+ Assert.assertEquals(123L, tmpNode.getSearchIndex());
+ Assert.assertFalse(tmpNode.isLastFragment());
}
@Test
@@ -126,6 +159,7 @@ public class InsertTabletNodeSerdeTest {
for (String tableName : new String[] {"table1", "ta`ble1", "root.table1"})
{
RelationalInsertTabletNode insertTabletNode =
getRelationalInsertTabletNodeWithSchema(tableName);
+ insertTabletNode.setLastFragment(true);
int serializedSize = insertTabletNode.serializedSize();
@@ -153,9 +187,42 @@ public class InsertTabletNodeSerdeTest {
new MeasurementSchema("s5", TSDataType.BOOLEAN)
});
Assert.assertEquals(insertTabletNode, tmpNode);
+ Assert.assertTrue(tmpNode.isLastFragment());
}
}
+ @Test
+ public void testDeserializeLegacyWALRelational() throws IOException {
+ RelationalInsertTabletNode insertTabletNode =
getRelationalInsertTabletNodeWithSchema("table1");
+ insertTabletNode.setSearchIndex(123L);
+
+ byte[] bytes = new byte[insertTabletNode.serializedSize()];
+ WALByteBufferForTest walBuffer = new
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+ insertTabletNode.serializeToWAL(walBuffer);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_TABLET.getNodeType(),
byteBuffer.getShort());
+ Assert.assertEquals(123L, byteBuffer.getLong());
+
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(bytes));
+ dataInputStream.readShort();
+
+ RelationalInsertTabletNode tmpNode =
+ RelationalInsertTabletNode.deserializeFromWAL(dataInputStream);
+ tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
+ tmpNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ });
+ Assert.assertEquals(insertTabletNode, tmpNode);
+ Assert.assertEquals(123L, tmpNode.getSearchIndex());
+ Assert.assertFalse(tmpNode.isLastFragment());
+ }
+
@Test
public void testInitTabletValuesWithAllTypes()
throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
index 2977841d5e6..b24a8cd29cf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
@@ -178,6 +178,29 @@ public class WALNodeWaitForRollFileTest {
assertNotNull(iterator.next());
}
+ @Test
+ public void testLegacySeparatorStillWorksAfterRollFile() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + File.separator +
"test.tsfile");
+
+ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {1});
+ insertTabletNode.setSearchIndex(1);
+ walNode.log(
+ memTable.getMemTableId(),
+ insertTabletNode,
+ Collections.singletonList(new int[] {0,
insertTabletNode.getRowCount()}));
+ walNode.log(memTable.getMemTableId(), new
ContinuousSameSearchIndexSeparatorNode());
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ walNode.rollWALFile();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+ assertTrue(iterator.hasNext());
+ assertNotNull(iterator.next());
+ }
+
/**
* Verifies that waitForNextReady wakes up when a WAL file roll is triggered
concurrently. A
* background thread rolls the WAL file while the main thread waits on the
iterator.
@@ -190,12 +213,11 @@ public class WALNodeWaitForRollFileTest {
// write data with search index
InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {1});
insertTabletNode.setSearchIndex(1);
+ insertTabletNode.setLastFragment(true);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0,
insertTabletNode.getRowCount()}));
- walNode.log(
- memTable.getMemTableId(), new
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
@@ -318,12 +340,11 @@ public class WALNodeWaitForRollFileTest {
// write data with search index — stays in the current (active) WAL file
InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {1});
insertTabletNode.setSearchIndex(1);
+ insertTabletNode.setLastFragment(true);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0,
insertTabletNode.getRowCount()}));
- walNode.log(
- memTable.getMemTableId(), new
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());