This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch update_last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca9d5439f77cd372967c571a773bae7af52ca630
Author: HTHou <[email protected]>
AuthorDate: Fri May 27 16:57:06 2022 +0800

    Update last cache when insert
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 42 +++++++++++++++--
 .../planner/plan/node/write/InsertRowNode.java     | 10 ++++
 .../planner/plan/node/write/InsertTabletNode.java  | 55 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7bb7abe855..2073fc8637 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -1078,8 +1079,7 @@ public class DataRegion {
       }
       long globalLatestFlushedTime =
           
lastFlushTimeManager.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
-      // TODO:LAST CACHE
-      //      tryToUpdateBatchInsertLastCache(insertTabletNode, 
globalLatestFlushedTime);
+      tryToUpdateBatchInsertLastCache(insertTabletNode, 
globalLatestFlushedTime);
 
       if (!noFailure) {
         throw new BatchProcessException(results);
@@ -1253,6 +1253,24 @@ public class DataRegion {
     }
   }
 
+  private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long 
latestFlushedTime) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
+    for (int i = 0; i < node.getColumns().length; i++) {
+      if (node.getColumns()[i] == null) {
+        continue;
+      }
+      // Update cached last value with high priority
+      DataNodeSchemaCache.getInstance()
+          .updateLastCache(
+              node.getDevicePath().concatNode(node.getMeasurements()[i]),
+              node.composeLastTimeValuePair(i),
+              true,
+              latestFlushedTime);
+    }
+  }
+
   private void insertToTsFileProcessor(
       InsertRowPlan insertRowPlan, boolean sequence, long timePartitionId)
       throws WriteProcessException {
@@ -1295,7 +1313,7 @@ public class DataRegion {
     long globalLatestFlushTime =
         
lastFlushTimeManager.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
 
-    // tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
+    tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
@@ -1328,6 +1346,24 @@ public class DataRegion {
     }
   }
 
+  private void tryToUpdateInsertLastCache(InsertRowNode node, long 
latestFlushedTime) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
+    for (int i = 0; i < node.getValues().length; i++) {
+      if (node.getValues()[i] == null) {
+        continue;
+      }
+      // Update cached last value with high priority
+      DataNodeSchemaCache.getInstance()
+          .updateLastCache(
+              node.getDevicePath().concatNode(node.getMeasurements()[i]),
+              node.composeTimeValuePair(i),
+              true,
+              latestFlushedTime);
+    }
+  }
+
   /**
    * WAL module uses this method to flush memTable
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 014d08d211..8e40d44869 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -42,8 +42,10 @@ import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
@@ -614,4 +616,12 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRow(this, context);
   }
+
+  public TimeValuePair composeTimeValuePair(int columnIndex) {
+    if (columnIndex >= values.length) {
+      return null;
+    }
+    Object value = values[columnIndex];
+    return new TimeValuePair(time, 
TsPrimitiveType.getByType(dataTypes[columnIndex], value));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index e076209edc..21422d124d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -39,10 +39,12 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataInputStream;
@@ -834,4 +836,57 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitInsertTablet(this, context);
   }
+
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+    if (measurementIndex >= columns.length) {
+      return null;
+    }
+
+    // get non-null value
+    int lastIdx = rowCount - 1;
+    if (bitMaps != null && bitMaps[measurementIndex] != null) {
+      BitMap bitMap = bitMaps[measurementIndex];
+      while (lastIdx >= 0) {
+        if (!bitMap.isMarked(lastIdx)) {
+          break;
+        }
+        lastIdx--;
+      }
+    }
+    if (lastIdx < 0) {
+      return null;
+    }
+
+    TsPrimitiveType value;
+    switch (dataTypes[measurementIndex]) {
+      case INT32:
+        int[] intValues = (int[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+        break;
+      case INT64:
+        long[] longValues = (long[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
+    }
+    return new TimeValuePair(times[lastIdx], value);
+  }
 }

Reply via email to