This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira_2047 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8005e4e1890c02b4ab1a5da70b2d2ca0a7126f52 Author: HTHou <[email protected]> AuthorDate: Mon Nov 22 17:53:21 2021 +0800 [IOTDB-2047] Fix Partial insert NPE for aligned and non-aligned series --- .../org/apache/iotdb/db/metadata/MManager.java | 16 +- .../iotdb/db/qp/physical/crud/InsertPlan.java | 18 -- .../java/org/apache/iotdb/db/utils/MemUtils.java | 12 ++ .../iotdb/db/metadata/MManagerBasicTest.java | 4 - .../session/IoTDBSessionDisableMemControlIT.java | 205 +++++++++++++++++++++ .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 68 +++++++ .../iotdb/session/IoTDBSessionVectorInsertIT.java | 75 ++++++++ 7 files changed, 363 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index d7c2eb6..433bd31 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -1776,19 +1776,9 @@ public class MManager { if (!config.isEnablePartialInsert()) { throw mismatchException; } else { - if (plan.isAligned()) { - // mark failed measurement - plan.markFailedMeasurementAlignedInsertion(mismatchException); - for (int j = 0; j < i; j++) { - // all the measurementMNodes should be null - measurementMNodes[j] = null; - } - break; - } else { - // mark failed measurement - plan.markFailedMeasurementInsertion(i, mismatchException); - continue; - } + // mark failed measurement + plan.markFailedMeasurementInsertion(i, mismatchException); + continue; } } measurementMNodes[i] = measurementMNode; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index b709896..737351c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -120,24 +120,6 @@ public abstract class InsertPlan extends PhysicalPlan { measurements[index] = null; } - public void markFailedMeasurementAlignedInsertion(Exception e) { - if (failedMeasurements == null) { - failedMeasurements = new ArrayList<>(); - failedExceptions = new ArrayList<>(); - failedIndices = new ArrayList<>(); - } - - for (int i = 0; i < measurements.length; i++) { - if (measurements[i] == null) { - continue; - } - failedMeasurements.add(measurements[i]); - failedExceptions.add(e); - failedIndices.add(i); - measurements[i] = null; - } - } - /** * Reconstruct this plan with the failed measurements. * diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java index 10617cc..ac7adcc 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java @@ -63,6 +63,9 @@ public class MemUtils { List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) { long memSize = 0L; for (int i = 0; i < dataTypes.size(); i++) { + if (value[i] == null) { + continue; + } memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize); } return memSize; @@ -77,6 +80,9 @@ public class MemUtils { // time and index size long memSize = 8L + 4L; for (int i = 0; i < dataTypes.size(); i++) { + if (value[i] == null) { + continue; + } if (dataTypes.get(i) == TSDataType.TEXT) { memSize += (addingTextDataSize ? getBinarySize((Binary) value[i]) : 0); } else { @@ -110,6 +116,9 @@ public class MemUtils { } long memSize = 0; for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) { + if (insertTabletPlan.getColumns()[i] == null) { + continue; + } // time column memSize memSize += (end - start) * 8L; if (insertTabletPlan.getDataTypes()[i] == TSDataType.TEXT && addingTextDataSize) { @@ -130,6 +139,9 @@ public class MemUtils { } long memSize = 0; for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) { + if (insertTabletPlan.getColumns()[i] == null) { + continue; + } TSDataType valueType; // value columns memSize valueType = insertTabletPlan.getDataTypes()[i]; diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java index 92642ed..d1227cc 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java @@ -1794,10 +1794,6 @@ public class MManagerBasicTest { // call getSeriesSchemasAndReadLockDevice IMNode node = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); assertEquals(3, manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**"))); - assertNull(insertRowPlan.getMeasurementMNodes()[0]); - assertNull(insertRowPlan.getMeasurementMNodes()[1]); - assertNull(insertRowPlan.getMeasurementMNodes()[2]); - assertEquals(3, insertRowPlan.getFailedMeasurementNumber()); } catch (Exception e) { e.printStackTrace(); diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java new file mode 100644 index 0000000..a2bff7f --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionDisableMemControlIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session; + +import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class IoTDBSessionDisableMemControlIT { + + private static Logger logger = LoggerFactory.getLogger(IoTDBSessionDisableMemControlIT.class); + + private Session session; + + @Before + public void setUp() { + System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/"); + EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig().setEnableMemControl(false); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + session.close(); + EnvironmentUtils.cleanEnv(); + IoTDBDescriptor.getInstance().getConfig().setEnableMemControl(true); + } + + @Test + public void testInsertPartialTablet() + throws IoTDBConnectionException, StatementExecutionException { + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + + if (!session.checkTimeseriesExists("root.sg.d.s1")) { + session.createTimeseries( + "root.sg.d.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + } + if (!session.checkTimeseriesExists("root.sg.d.s2")) { + session.createTimeseries( + "root.sg.d.s2", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); + } + if (!session.checkTimeseriesExists("root.sg.d.s3")) { + session.createTimeseries( + "root.sg.d.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + } + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.DOUBLE)); + schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT)); + + Tablet tablet = new Tablet("root.sg.d", schemaList, 10); + + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("s1", rowIndex, 1L); + tablet.addValue("s2", rowIndex, 1D); + tablet.addValue("s3", rowIndex, new Binary("1")); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + try { + session.insertTablet(tablet, true); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + try { + session.insertTablet(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + + SessionDataSet dataSet = + session.executeQueryStatement("select count(s1), count(s2), count(s3) from root.sg.d"); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + Assert.assertEquals(15L, rowRecord.getFields().get(0).getLongV()); + Assert.assertEquals(15L, rowRecord.getFields().get(1).getLongV()); + Assert.assertEquals(0L, rowRecord.getFields().get(2).getLongV()); + } + session.close(); + } + + @Test + public void testInsertPartialAlignedTablet() + throws IoTDBConnectionException, StatementExecutionException { + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + + List<String> multiMeasurementComponents = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + multiMeasurementComponents.add("s" + i); + } + List<TSDataType> dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.INT64); + dataTypes.add(TSDataType.DOUBLE); + dataTypes.add(TSDataType.INT64); + List<TSEncoding> encodings = new ArrayList<>(); + List<CompressionType> compressors = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + encodings.add(TSEncoding.PLAIN); + compressors.add(CompressionType.SNAPPY); + } + session.createAlignedTimeseries( + "root.sg.d", multiMeasurementComponents, dataTypes, encodings, compressors, null); + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.DOUBLE)); + schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT)); + + Tablet tablet = new Tablet("root.sg.d", schemaList, 10); + + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("s1", rowIndex, 1L); + tablet.addValue("s2", rowIndex, 1D); + tablet.addValue("s3", rowIndex, new Binary("1")); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + try { + session.insertAlignedTablet(tablet, true); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + try { + session.insertAlignedTablet(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + + SessionDataSet dataSet = session.executeQueryStatement("select s1, s2, s3 from root.sg.d"); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + Assert.assertEquals(1, rowRecord.getFields().get(0).getLongV()); + Assert.assertEquals(1.0, rowRecord.getFields().get(1).getDoubleV(), 0.01); + Assert.assertEquals(null, rowRecord.getFields().get(2).getObjectValue(TSDataType.TEXT)); + } + session.close(); + } +} diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java index 57f4520..961eadf 100644 --- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java @@ -1300,6 +1300,74 @@ public class IoTDBSessionSimpleIT { assertTrue(checkSet.isEmpty()); } + @Test + public void testInsertPartialTablet2() + throws IoTDBConnectionException, StatementExecutionException { + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + + if (!session.checkTimeseriesExists("root.sg.d.s1")) { + session.createTimeseries( + "root.sg.d.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + } + if (!session.checkTimeseriesExists("root.sg.d.s2")) { + session.createTimeseries( + "root.sg.d.s2", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); + } + if (!session.checkTimeseriesExists("root.sg.d.s3")) { + session.createTimeseries( + "root.sg.d.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + } + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.DOUBLE)); + schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT)); + + Tablet tablet = new Tablet("root.sg.d", schemaList, 10); + + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("s1", rowIndex, 1L); + tablet.addValue("s2", rowIndex, 1D); + tablet.addValue("s3", rowIndex, new Binary("1")); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + try { + session.insertTablet(tablet, true); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + try { + session.insertTablet(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + + SessionDataSet dataSet = + session.executeQueryStatement("select count(s1), count(s2), count(s3) from root.sg.d"); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + Assert.assertEquals(15L, rowRecord.getFields().get(0).getLongV()); + Assert.assertEquals(15L, rowRecord.getFields().get(1).getLongV()); + Assert.assertEquals(0L, rowRecord.getFields().get(2).getLongV()); + } + session.close(); + } + private void initTreeTemplate(String path) throws IoTDBConnectionException, StatementExecutionException, IOException { Template sessionTemplate = new Template("treeTemplate", true); diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java index 8ba3c37..808a79b 100644 --- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java @@ -24,13 +24,17 @@ import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -442,4 +446,75 @@ public class IoTDBSessionVectorInsertIT { session.insertAlignedRecordsOfOneDevice( prefixPath, times, subMeasurementsList, typeList, valueList); } + + @Test + public void testInsertPartialAlignedTablet() + throws IoTDBConnectionException, StatementExecutionException { + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + + List<String> multiMeasurementComponents = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + multiMeasurementComponents.add("s" + i); + } + List<TSDataType> dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.INT64); + dataTypes.add(TSDataType.DOUBLE); + dataTypes.add(TSDataType.INT64); + List<TSEncoding> encodings = new ArrayList<>(); + List<CompressionType> compressors = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + encodings.add(TSEncoding.PLAIN); + compressors.add(CompressionType.SNAPPY); + } + session.createAlignedTimeseries( + "root.sg.d", multiMeasurementComponents, dataTypes, encodings, compressors, null); + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.DOUBLE)); + schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT)); + + Tablet tablet = new Tablet("root.sg.d", schemaList, 10); + + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("s1", rowIndex, 1L); + tablet.addValue("s2", rowIndex, 1D); + tablet.addValue("s3", rowIndex, new Binary("1")); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + try { + session.insertAlignedTablet(tablet, true); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + try { + session.insertAlignedTablet(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals( + "313: failed to insert measurements [s3] caused by DataType mismatch, Insert measurement s3 type TEXT, metadata tree type INT64", + e.getMessage()); + } + tablet.reset(); + } + + SessionDataSet dataSet = session.executeQueryStatement("select s1, s2, s3 from root.sg.d"); + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + Assert.assertEquals(1, rowRecord.getFields().get(0).getLongV()); + Assert.assertEquals(1.0, rowRecord.getFields().get(1).getDoubleV(), 0.01); + Assert.assertEquals(null, rowRecord.getFields().get(2).getObjectValue(TSDataType.TEXT)); + } + session.close(); + } }
