This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_tablet_last in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit effcddce34e34149e745538828902eed33cb4a88 Author: qiaojialin <[email protected]> AuthorDate: Sun Apr 26 13:16:33 2020 +0800 try to fix last cache --- .../db/engine/storagegroup/StorageGroupProcessor.java | 7 ++++--- .../iotdb/db/qp/physical/crud/InsertTabletPlan.java | 16 ++++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 803e147..74bfa63 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -575,6 +575,7 @@ public class StorageGroupProcessor { return; } + logger.info("@+++<<<: current batch start {} end {}", start, end); TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); if (tsFileProcessor == null) { for (int i = start; i < end; i++) { @@ -601,7 +602,7 @@ public class StorageGroupProcessor { } long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault( insertTabletPlan.getDeviceId(), Long.MIN_VALUE); - tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime); + tryToUpdateBatchInsertLastCache(insertTabletPlan, end -1, globalLatestFlushedTime); // check memtable size and may async try to flush the work memtable if (tsFileProcessor.shouldFlush()) { @@ -609,7 +610,7 @@ public class StorageGroupProcessor { } } - public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) + public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, int lastIndex, Long latestFlushedTime) throws WriteProcessException { MNode node = null; try { @@ -619,7 +620,7 @@ public class StorageGroupProcessor { // Update cached last value with high priority MNode measurementNode = node.getChild(measurementList[i]); ((LeafMNode) measurementNode) - .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime); + .updateCachedLast(plan.composeLastTimeValuePair(i, lastIndex), true, latestFlushedTime); } } catch (MetadataException e) { throw new WriteProcessException(e); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java index a26ac3b..65abe11 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java @@ -417,7 +417,7 @@ public class InsertTabletPlan extends PhysicalPlan { return tmpMaxTime; } - public TimeValuePair composeLastTimeValuePair(int measurementIndex) { + public TimeValuePair composeLastTimeValuePair(int measurementIndex, int lastIndex) { if (measurementIndex >= columns.length) { return null; } @@ -425,33 +425,33 @@ public class InsertTabletPlan extends PhysicalPlan { switch (dataTypes[measurementIndex]) { case INT32: int[] intValues = (int[]) columns[measurementIndex]; - value = new TsInt(intValues[end - 1]); + value = new TsInt(intValues[lastIndex]); break; case INT64: long[] longValues = (long[]) columns[measurementIndex]; - value = new TsLong(longValues[end - 1]); + value = new TsLong(longValues[lastIndex]); break; case FLOAT: float[] floatValues = (float[]) columns[measurementIndex]; - value = new TsFloat(floatValues[end - 1]); + value = new TsFloat(floatValues[lastIndex]); break; case DOUBLE: double[] doubleValues = (double[]) columns[measurementIndex]; - value = new TsDouble(doubleValues[end - 1]); + value = new TsDouble(doubleValues[lastIndex]); break; case BOOLEAN: boolean[] boolValues = (boolean[]) columns[measurementIndex]; - value = new TsBoolean(boolValues[end - 1]); + value = new TsBoolean(boolValues[lastIndex]); break; case TEXT: Binary[] binaryValues = (Binary[]) columns[measurementIndex]; - value = new TsBinary(binaryValues[end - 1]); + value = new TsBinary(binaryValues[lastIndex]); break; default: throw new UnSupportedDataTypeException( String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex])); } - return new TimeValuePair(times[end - 1], value); + return new TimeValuePair(times[lastIndex], value); } public long[] getTimes() {
