This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch update_last_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca9d5439f77cd372967c571a773bae7af52ca630 Author: HTHou <[email protected]> AuthorDate: Fri May 27 16:57:06 2022 +0800 Update last cache when insert --- .../iotdb/db/engine/storagegroup/DataRegion.java | 42 +++++++++++++++-- .../planner/plan/node/write/InsertRowNode.java | 10 ++++ .../planner/plan/node/write/InsertTabletNode.java | 55 ++++++++++++++++++++++ 3 files changed, 104 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 7bb7abe855..2073fc8637 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.metadata.idtable.IDTable; import org.apache.iotdb.db.metadata.idtable.IDTableManager; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; @@ -1078,8 +1079,7 @@ public class DataRegion { } long globalLatestFlushedTime = lastFlushTimeManager.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath()); - // TODO:LAST CACHE - // tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime); + tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime); if (!noFailure) { throw new BatchProcessException(results); @@ -1253,6 +1253,24 @@ public class DataRegion { } } + private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long latestFlushedTime) { + if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { + return; + } + for (int i = 0; i < node.getColumns().length; i++) { + if (node.getColumns()[i] == null) { + continue; + } + // Update cached last value with high priority + DataNodeSchemaCache.getInstance() + .updateLastCache( + node.getDevicePath().concatNode(node.getMeasurements()[i]), + node.composeLastTimeValuePair(i), + true, + latestFlushedTime); + } + } + private void insertToTsFileProcessor( InsertRowPlan insertRowPlan, boolean sequence, long timePartitionId) throws WriteProcessException { @@ -1295,7 +1313,7 @@ public class DataRegion { long globalLatestFlushTime = lastFlushTimeManager.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath()); - // tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime); + tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime); // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { @@ -1328,6 +1346,24 @@ public class DataRegion { } } + private void tryToUpdateInsertLastCache(InsertRowNode node, long latestFlushedTime) { + if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { + return; + } + for (int i = 0; i < node.getValues().length; i++) { + if (node.getValues()[i] == null) { + continue; + } + // Update cached last value with high priority + DataNodeSchemaCache.getInstance() + .updateLastCache( + node.getDevicePath().concatNode(node.getMeasurements()[i]), + node.composeTimeValuePair(i), + true, + latestFlushedTime); + } + } + /** * WAL module uses this method to flush memTable * diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java index 014d08d211..8e40d44869 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java @@ -42,8 +42,10 @@ import org.apache.iotdb.db.wal.buffer.WALEntryValue; import org.apache.iotdb.db.wal.utils.WALWriteUtils; import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; @@ -614,4 +616,12 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitInsertRow(this, context); } + + public TimeValuePair composeTimeValuePair(int columnIndex) { + if (columnIndex >= values.length) { + return null; + } + Object value = values[columnIndex]; + return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value)); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java index e076209edc..21422d124d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java @@ -39,10 +39,12 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils; import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.io.DataInputStream; @@ -834,4 +836,57 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitInsertTablet(this, context); } + + public TimeValuePair composeLastTimeValuePair(int measurementIndex) { + if (measurementIndex >= columns.length) { + return null; + } + + // get non-null value + int lastIdx = rowCount - 1; + if (bitMaps != null && bitMaps[measurementIndex] != null) { + BitMap bitMap = bitMaps[measurementIndex]; + while (lastIdx >= 0) { + if (!bitMap.isMarked(lastIdx)) { + break; + } + lastIdx--; + } + } + if (lastIdx < 0) { + return null; + } + + TsPrimitiveType value; + switch (dataTypes[measurementIndex]) { + case INT32: + int[] intValues = (int[]) columns[measurementIndex]; + value = new TsPrimitiveType.TsInt(intValues[lastIdx]); + break; + case INT64: + long[] longValues = (long[]) columns[measurementIndex]; + value = new TsPrimitiveType.TsLong(longValues[lastIdx]); + break; + case FLOAT: + float[] floatValues = (float[]) columns[measurementIndex]; + value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]); + break; + case DOUBLE: + double[] doubleValues = (double[]) columns[measurementIndex]; + value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]); + break; + case BOOLEAN: + boolean[] boolValues = (boolean[]) columns[measurementIndex]; + value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]); + break; + case TEXT: + Binary[] binaryValues = (Binary[]) columns[measurementIndex]; + value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]); + break; + default: + throw new UnSupportedDataTypeException( + String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex])); + } + return new TimeValuePair(times[lastIdx], value); + } }
