This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch check_lost_data_ty
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/check_lost_data_ty by this 
push:
     new a0c34a4  add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan
a0c34a4 is described below

commit a0c34a4dbcb42746c4bf9118a66a0bb45ed3dd41
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 29 17:44:15 2021 +0800

    add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan
---
 .../engine/storagegroup/StorageGroupProcessor.java |  2 ++
 .../db/engine/storagegroup/TsFileProcessor.java    |  2 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  1 +
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 15 ++++++++++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 35 +++++++++++++++++-----
 5 files changed, 48 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8beca06..cecb564 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1050,6 +1050,7 @@ public class StorageGroupProcessor {
       return;
     }
 
+    
insertRowPlan.checkForTianYuan("StorageGroupProcessor#insertToTsFileProcessor");
     tsFileProcessor.insert(insertRowPlan);
 
     // try to update the latest time of the device of this tsRecord
@@ -2910,6 +2911,7 @@ public class StorageGroupProcessor {
     writeLock("InsertRowsOfOneDevice");
     try {
       boolean isSequence = false;
+      
insertRowsOfOneDevicePlan.checkForTianYuan("StorageGroupProcessor#insert");
       InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
       for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; 
i++) {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9642271..fd2aaa8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -188,6 +188,8 @@ public class TsFileProcessor {
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
 
+    insertRowPlan.checkForTianYuan("TsFileProcessor#InsertRowPlan");
+
     workMemTable.insert(insertRowPlan);
 
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b07750f..ce2ddf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1198,6 +1198,7 @@ public class PlanExecutor implements IPlanExecutor {
         // we do not need to infer data type for insertRowsOfOneDevicePlan
       }
       // ok, we can begin to write data into the engine..
+      insertRowsOfOneDevicePlan.checkForTianYuan("PlanExecutor#insert");
       StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
 
       List<String> notExistedPaths = null;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 8399527..f7caf0b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -63,6 +63,21 @@ public class InsertRowPlan extends InsertPlan {
 
   private List<Object> failedValues;
 
+  public void checkForTianYuan(String location) {
+    for (int j = 0; j < getMeasurements().length; j++) {
+      if (getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
+        String value = ((Binary) getValues()[j]).getStringValue().substring(0, 
35);
+        if (!value.contains(getDeviceId().getMeasurement().substring(4))) {
+          logger.error(
+              "{}: receive error data,device:{}, value(first 100 bytes): {}",
+              location,
+              getDeviceId(),
+              value);
+        }
+      }
+    }
+  }
+
   public InsertRowPlan() {
     super(OperatorType.INSERT);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 46da4e0..a861adf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -23,6 +23,10 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,15 +35,29 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan 
{
   private static Logger logger = 
LoggerFactory.getLogger(InsertRowsOfOneDevicePlan.class);
   boolean[] isExecuted;
   private InsertRowPlan[] rowPlans;
 
+  public void checkForTianYuan(String location) {
+    for (int i = 0; i < rowPlans.length; ++i) {
+      for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
+        if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
+          String value = ((Binary) 
rowPlans[i].getValues()[j]).getStringValue().substring(0, 35);
+          if 
(!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
+            logger.error(
+                "{}: receive error data,device:{}, value(first 100 bytes): {}",
+                location,
+                rowPlans[i].getDeviceId(),
+                value);
+          }
+        }
+      }
+    }
+  }
+
   public InsertRowsOfOneDevicePlan(
       PartialPath deviceId,
       Long[] insertTimes,
@@ -67,12 +85,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan 
implements BatchPlan {
                 + ", time:"
                 + insertTimes[i]);
       }
-      //Just for Tianyuan debug
-      for(int j = 0; j < rowPlans[i].getMeasurements().length; j ++) {
+      // Just for Tianyuan debug
+      for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
         if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
-          String value = ((Binary) 
rowPlans[i].getValues()[j]).getStringValue().substring(0,100);
+          String value = ((Binary) 
rowPlans[i].getValues()[j]).getStringValue().substring(0, 100);
           if 
(!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
-            logger.error("receive error data,device:{}, value(first 100 
bytes): {}", rowPlans[i].getDeviceId(), value);
+            logger.error(
+                "receive error data,device:{}, value(first 100 bytes): {}",
+                rowPlans[i].getDeviceId(),
+                value);
           }
         }
       }

Reply via email to