This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch batch_wal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 82ba91908023c54f6083d73f76dfbb0f2b3f4d7a Author: HTHou <[email protected]> AuthorDate: Mon Apr 1 15:31:07 2024 +0800 finishing --- .../plan/planner/plan/node/PlanNodeType.java | 4 + .../planner/plan/node/write/InsertRowsNode.java | 75 ++++++++++++++++++- .../db/storageengine/dataregion/DataRegion.java | 26 ++++--- .../dataregion/memtable/TsFileProcessor.java | 86 ++++++++++++++++++++++ .../dataregion/wal/buffer/WALEntryType.java | 2 + .../dataregion/wal/buffer/WALInfoEntry.java | 1 + .../dataregion/wal/node/IWALNode.java | 4 + .../dataregion/wal/node/WALFakeNode.java | 6 ++ .../storageengine/dataregion/wal/node/WALNode.java | 7 ++ .../wal/recover/file/TsFilePlanRedoer.java | 25 +++++++ .../file/UnsealedTsFileRecoverPerformer.java | 4 + 11 files changed, 230 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index bda66843a97..d76d4cdfb05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -238,6 +238,8 @@ public enum PlanNodeType { return InsertTabletNode.deserializeFromWAL(stream); case 14: return InsertRowNode.deserializeFromWAL(stream); + case 15: + return InsertRowsNode.deserializeFromWAL(stream); case 44: return DeleteDataNode.deserializeFromWAL(stream); default: @@ -252,6 +254,8 @@ public enum PlanNodeType { return InsertTabletNode.deserializeFromWAL(buffer); case 14: return InsertRowNode.deserializeFromWAL(buffer); + case 15: + return InsertRowsNode.deserializeFromWAL(buffer); case 44: return DeleteDataNode.deserializeFromWAL(buffer); default: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 12e229470be..77c2de63d8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -30,9 +30,12 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -43,7 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class InsertRowsNode extends InsertNode { +public class InsertRowsNode extends InsertNode implements WALEntryValue { /** * Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes, @@ -265,4 +268,74 @@ public class InsertRowsNode extends InsertNode { this.progressIndex = progressIndex; insertRowNodeList.forEach(insertRowNode -> insertRowNode.setProgressIndex(progressIndex)); } + + // region serialize & deserialize methods for WAL + /** Serialized size for wal. */ + @Override + public int serializedSize() { + return Short.BYTES + subSerializeSize(); + } + + private int subSerializeSize() { + int size = Integer.BYTES; + for (InsertRowNode insertRowNode : insertRowNodeList) { + size += insertRowNode.serializedSize(); + } + return size; + } + + /** + * Compared with {@link this#serialize(ByteBuffer)}, more info: search index, less info: + * isNeedInferType + */ + @Override + public void serializeToWAL(IWALByteBufferView buffer) { + buffer.putShort(PlanNodeType.INSERT_ROWS.getNodeType()); + subSerialize(buffer); + } + + private void subSerialize(IWALByteBufferView buffer) { + buffer.putInt(insertRowNodeList.size()); + for (InsertRowNode insertRowNode : insertRowNodeList) { + insertRowNode.serializeToWAL(buffer); + } + } + + /** + * Deserialize from wal. + * + * @param stream - DataInputStream + * @return InsertRowNode + * @throws IOException - If an I/O error occurs. + * @throws IllegalArgumentException - If meets illegal argument. + */ + public static InsertRowsNode deserializeFromWAL(DataInputStream stream) throws IOException { + // we do not store plan node id in wal entry + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + int listSize = stream.readInt(); + for (int i = 0; i < listSize; i++) { + InsertRowNode insertRowNode = InsertRowNode.deserializeFromWAL(stream); + insertRowsNode.addOneInsertRowNode(insertRowNode, i); + } + return insertRowsNode; + } + + /** + * Deserialize from wal. + * + * @param buffer - ByteBuffer + * @return InsertRowNode + * @throws IllegalArgumentException - If meets illegal argument + */ + public static InsertRowsNode deserializeFromWAL(ByteBuffer buffer) { + // we do not store plan node id in wal entry + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + int listSize = buffer.getInt(); + for (int i = 0; i < listSize; i++) { + InsertRowNode insertRowNode = InsertRowNode.deserializeFromWAL(buffer); + insertRowsNode.addOneInsertRowNode(insertRowNode, i); + } + return insertRowsNode; + } + // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 2a5602e315e..98e92d7f2fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1181,14 +1181,18 @@ public class DataRegion implements IDataRegionForQuery { if (tsFileProcessor == null) { continue; } - tsFileProcessorMap.compute(tsFileProcessor, (k, v) -> { - if (v == null) { - v = new InsertRowsNode(insertRowsNode.getPlanNodeId()); - v.setSearchIndex(insertRowNode.getSearchIndex()); - } - v.addOneInsertRowNode(insertRowNode, v.getInsertRowNodeList().size()); - return v; - }); + int finalI = i; + tsFileProcessorMap.compute( + tsFileProcessor, + (k, v) -> { + if (v == null) { + v = new InsertRowsNode(insertRowsNode.getPlanNodeId()); + v.setSearchIndex(insertRowNode.getSearchIndex()); + v.setAligned(insertRowNode.isAligned()); + } + v.addOneInsertRowNode(insertRowNode, finalI); + return v; + }); try { tsFileProcessor.insert(insertRowNode, costsForMetrics); } catch (WriteProcessException e) { @@ -1204,7 +1208,11 @@ public class DataRegion implements IDataRegionForQuery { try { tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); } catch (WriteProcessException e) { - insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + insertRowsNode + .getResults() + .put( + subInsertRowsNode.getInsertRowNodeIndexList().get(0), + RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } if (entry.getKey().shouldFlush()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 71d93647a0d..e42249dae17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; import org.apache.iotdb.db.service.metrics.WritingMetrics; @@ -317,6 +318,91 @@ public class TsFileProcessor { costsForMetrics[3] += System.nanoTime() - startTime; } + public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics) + throws WriteProcessException { + + if (workMemTable == null) { + long startTime = System.nanoTime(); + createNewWorkingMemTable(); + // recordCreateMemtableBlockCost + costsForMetrics[0] += System.nanoTime() - startTime; + WritingMetrics.getInstance() + .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); + } + + long[] memIncrements = null; + + long memControlStartTime = System.nanoTime(); + if (insertRowsNode.isAligned()) { + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + memIncrements = + checkAlignedMemCostAndAddToTspInfo( + insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), + insertRowNode.getDataTypes(), insertRowNode.getValues()); + } + } else { + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + memIncrements = + checkMemCostAndAddToTspInfo( + insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), + insertRowNode.getDataTypes(), insertRowNode.getValues()); + } + } + // recordScheduleMemoryBlockCost + costsForMetrics[1] += System.nanoTime() - memControlStartTime; + + long startTime = System.nanoTime(); + WALFlushListener walFlushListener; + try { + walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowsNode); + if (walFlushListener.waitForResult() == AbstractResultListener.Status.FAILURE) { + throw walFlushListener.getCause(); + } + } catch (Exception e) { + rollbackMemoryInfo(memIncrements); + throw new WriteProcessException( + String.format( + "%s: %s write WAL failed", + storageGroupName, tsFileResource.getTsFile().getAbsolutePath()), + e); + } finally { + // recordScheduleWalCost + costsForMetrics[2] += System.nanoTime() - startTime; + } + + startTime = System.nanoTime(); + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(insertRowNode); + if (!insertRowNode.isGeneratedByPipe()) { + workMemTable.markAsNotGeneratedByPipe(); + } + PipeInsertionDataNodeListener.getInstance() + .listenToInsertNode( + dataRegionInfo.getDataRegion().getDataRegionId(), + walFlushListener.getWalEntryHandler(), + insertRowNode, + tsFileResource); + + if (insertRowNode.isAligned()) { + workMemTable.insertAlignedRow(insertRowNode); + } else { + workMemTable.insert(insertRowNode); + } + + // update start time of this memtable + tsFileResource.updateStartTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); + // for sequence tsfile, we update the endTime only when the file is prepared to be closed. + // for unsequence tsfile, we have to update the endTime for each insertion. + if (!sequence) { + tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); + } + + tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex()); + } + // recordScheduleMemTableCost + costsForMetrics[3] += System.nanoTime() - startTime; + } + private void createNewWorkingMemTable() { workMemTable = MemTableManager.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java index defaf07ffd6..c304d605695 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java @@ -38,6 +38,8 @@ public enum WALEntryType { DELETE_DATA_NODE((byte) 6), /** {@link org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint} */ MEMORY_TABLE_CHECKPOINT((byte) 7), + /** {@link org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */ + INSERT_ROWS_NODE((byte) 8), // endregion // region signal entry type // signal wal buffer has been closed diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java index e8fe5633f83..966361857f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java @@ -73,6 +73,7 @@ public class WALInfoEntry extends WALEntry { .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd); break; case INSERT_ROW_NODE: + case INSERT_ROWS_NODE: case DELETE_DATA_NODE: case MEMORY_TABLE_SNAPSHOT: value.serializeToWAL(buffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java index 251d2bdbcde..7e10c3a70c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -34,6 +35,9 @@ public interface IWALNode extends FlushListener, AutoCloseable, ConsensusReqRead /** Log InsertRowNode. */ WALFlushListener log(long memTableId, InsertRowNode insertRowNode); + /** Log InsertRowsNode. */ + WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode); + /** Log InsertTabletNode. */ WALFlushListener log(long memTableId, InsertTabletNode insertTabletNode, int start, int end); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java index 9c2f926c930..2902a6dcae9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException; @@ -50,6 +51,11 @@ public class WALFakeNode implements IWALNode { return getResult(); } + @Override + public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) { + return getResult(); + } + @Override public WALFlushListener log( long memTableId, InsertTabletNode insertTabletNode, int start, int end) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index c89f2d8cb9c..57003b957af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -130,6 +131,12 @@ public class WALNode implements IWALNode { return log(walEntry); } + @Override + public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) { + WALEntry walEntry = new WALInfoEntry(memTableId, insertRowsNode); + return log(walEntry); + } + @Override public WALFlushListener log( long memTableId, InsertTabletNode insertTabletNode, int start, int end) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java index 2ecb64e28d2..67addc2b5ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -111,6 +112,30 @@ public class TsFilePlanRedoer { } } + void redoInsertRows(InsertRowsNode insertRowsNode) { + for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) { + if (!node.hasValidMeasurements()) { + continue; + } + if (tsFileResource != null) { + // orders of insert node is guaranteed by storage engine, just check time in the file + // the last chunk group may contain the same data with the logs, ignore such logs in seq + // file + long lastEndTime = tsFileResource.getEndTime(node.getDeviceID()); + long minTimeInNode; + minTimeInNode = node.getTime(); + if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode) { + return; + } + } + if (node.isAligned()) { + recoveryMemTable.insertAlignedRow(node); + } else { + recoveryMemTable.insert(node); + } + } + } + void resetRecoveryMemTable(IMemTable memTable) { this.recoveryMemTable = memTable; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index d2b3fd07d9a..4d39dd30dfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; @@ -197,6 +198,9 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform case INSERT_TABLET_NODE: walRedoer.redoInsert((InsertNode) walEntry.getValue()); break; + case INSERT_ROWS_NODE: + walRedoer.redoInsertRows((InsertRowsNode) walEntry.getValue()); + break; case DELETE_DATA_NODE: walRedoer.redoDelete((DeleteDataNode) walEntry.getValue()); break;
