This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch vector_insertrowplan_cluster in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15ca772e548361c5ea718f0f8e029b07465d1e1a Author: LebronAl <[email protected]> AuthorDate: Wed May 19 17:59:23 2021 +0800 fix --- .../apache/iotdb/cluster/metadata/CMManager.java | 26 ++++++++--- .../org/apache/iotdb/db/metadata/MManager.java | 29 +++++++++--- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +- .../iotdb/db/metadata/MManagerImproveTest.java | 2 +- .../test/java/org/apache/iotdb/db/sql/Cases.java | 53 +++++++++++++++++++++- 5 files changed, 95 insertions(+), 17 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index d2700a3..cb1e237 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -511,12 +511,13 @@ public class CMManager extends MManager { } @Override - public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path) + public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path, boolean isTemplate) throws MetadataException, IOException { return getDeviceNodeWithAutoCreate( path, - ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema(), + config.isAutoCreateSchemaEnabled(), false, + isTemplate, config.getDefaultStorageGroupLevel()); } @@ -742,7 +743,7 @@ public class CMManager extends MManager { seriesList.add(deviceId.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurementId); } if (hasVector) { - return createAlignedTimeseries(seriesList, (InsertTabletPlan) insertPlan); + return createAlignedTimeseries(seriesList, insertPlan); } PartitionGroup partitionGroup = metaGroupMember.getPartitionTable().route(storageGroupName.getFullPath(), 0); @@ -755,16 +756,27 @@ public class CMManager extends MManager { return createTimeseries(unregisteredSeriesList, seriesList, insertPlan); } - private boolean createAlignedTimeseries(List<String> seriesList, InsertTabletPlan insertPlan) + private boolean createAlignedTimeseries(List<String> seriesList, InsertPlan insertPlan) throws IllegalPathException { List<String> measurements = new ArrayList<>(); for (String series : seriesList) { measurements.addAll(MetaUtils.getMeasurementsInPartialPath(new PartialPath(series))); } - List<TSDataType> dataTypes = new ArrayList<>(); - List<TSEncoding> encodings = new ArrayList<>(); - for (TSDataType dataType : insertPlan.getDataTypes()) { + List<TSDataType> dataTypes = new ArrayList<>(measurements.size()); + List<TSEncoding> encodings = new ArrayList<>(measurements.size()); + for (int index = 0; index < measurements.size(); index++) { + TSDataType dataType; + if (insertPlan.getDataTypes() != null && insertPlan.getDataTypes()[index] != null) { + dataType = insertPlan.getDataTypes()[index]; + } else { + dataType = + TypeInferenceUtils.getPredictedDataType( + insertPlan instanceof InsertTabletPlan + ? Array.get(((InsertTabletPlan) insertPlan).getColumns()[index], 0) + : ((InsertRowPlan) insertPlan).getValues()[index], + true); + } dataTypes.add(dataType); encodings.add(getDefaultEncoding(dataType)); } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 53fc126..474f4c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -1354,9 +1354,20 @@ public class MManager { * <p>(we develop this method as we need to get the node's lock after we get the lock.writeLock()) * * @param path path + * @param isTemplate, If path doesn't exists in mNodeCache. For the call to create the template, + * it needs to guarantee the success of the creation if no sg will be created; For the call to + * create the device, it needs to make sure that a PathNotExistException will be thrown. + * * @param isTemplate, If path doesn't exists in mNodeCache. For the call to create the + * @param allowCreateSg, The stand-alone version can create an sg at will, but the cluster version + * needs to make the Meta group aware of the creation of an SG, so an exception needs to be + * thrown here */ public Pair<MNode, Template> getDeviceNodeWithAutoCreate( - PartialPath path, boolean autoCreateSchema, boolean allowCreateSg, int sgLevel) + PartialPath path, + boolean autoCreateSchema, + boolean allowCreateSg, + boolean isTemplate, + int sgLevel) throws IOException, MetadataException { Pair<MNode, Template> node; boolean shouldSetStorageGroup; @@ -1364,12 +1375,11 @@ public class MManager { node = mNodeCache.get(path); return node; } catch (CacheException e) { - if (!autoCreateSchema) { + if (!autoCreateSchema && !isTemplate) { throw new PathNotExistException(path.getFullPath()); } shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException; } - try { if (shouldSetStorageGroup) { if (allowCreateSg) { @@ -1379,6 +1389,7 @@ public class MManager { throw new StorageGroupNotSetException(path.getFullPath()); } } + node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel); if (!(node.left instanceof StorageGroupMNode)) { logWriter.autoCreateDeviceMNode(new AutoCreateDeviceMNodePlan(node.left.getPartialPath())); @@ -1395,10 +1406,14 @@ public class MManager { } /** !!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. */ - public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path) + public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path, boolean isTemplate) throws MetadataException, IOException { return getDeviceNodeWithAutoCreate( - path, config.isAutoCreateSchemaEnabled(), true, config.getDefaultStorageGroupLevel()); + path, + config.isAutoCreateSchemaEnabled(), + true, + isTemplate, + config.getDefaultStorageGroupLevel()); } // attention: this path must be a device node @@ -2161,7 +2176,7 @@ public class MManager { MeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes(); // 1. get device node - Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId); + Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId, false); if (deviceMNode.left.getDeviceTemplate() != null) { deviceMNode.right = deviceMNode.left.getDeviceTemplate(); } @@ -2425,7 +2440,7 @@ public class MManager { // get mnode and update template should be atomic synchronized (this) { Pair<MNode, Template> node = - getDeviceNodeWithAutoCreate(new PartialPath(plan.getPrefixPath())); + getDeviceNodeWithAutoCreate(new PartialPath(plan.getPrefixPath()), true); if (node.left.getDeviceTemplate() != null) { if (node.left.getDeviceTemplate().equals(template)) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 09eecd1..6d4ef15 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -1051,7 +1051,7 @@ public class PlanExecutor implements IPlanExecutor { String device = chunkGroupMetadata.getDevice(); MNode node = IoTDB.metaManager.getDeviceNodeWithAutoCreate( - new PartialPath(device), true, true, sgLevel) + new PartialPath(device), true, true, false, sgLevel) .left; for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { PartialPath series = diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java index 4b52b54..b16c130 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java @@ -138,7 +138,7 @@ public class MManagerImproveTest { private void doCacheTest(String deviceId, List<String> measurementList) throws MetadataException { try { - MNode node = mManager.getDeviceNodeWithAutoCreate(new PartialPath(deviceId)).left; + MNode node = mManager.getDeviceNodeWithAutoCreate(new PartialPath(deviceId), false).left; for (String s : measurementList) { assertTrue(node.hasChild(s)); MeasurementMNode measurementNode = (MeasurementMNode) node.getChild(s); diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java index 4655c61..15b706c 100644 --- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java +++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java @@ -131,7 +131,7 @@ public abstract class Cases { String timeSeriesSuffix = ".temperature WITH DATATYPE=DOUBLE, ENCODING=RLE"; String timeSeries; for (int i = 0; i < n; i++) { - timeSeries = timeSeriesPrefix + String.valueOf(i) + timeSeriesSuffix; + timeSeries = timeSeriesPrefix + i + timeSeriesSuffix; writeStatement.execute(String.format("create timeseries %s ", timeSeries)); } @@ -146,4 +146,55 @@ public abstract class Cases { resultSet.close(); } } + + @Test + public void testInsertAlignedValues() throws SQLException { + writeStatement.execute( + "insert into root.t1.wf01.wt01(time, (status, temperature)) values (4000, (true, 17.1))"); + writeStatement.execute( + "insert into root.t1.wf01.wt01(time, (status, temperature)) values (5000, (false, 20.1))"); + writeStatement.execute( + "insert into root.t1.wf01.wt01(time, (status, temperature)) values (6000, (true, 22))"); + // auto-create-schema test + // same sg, but different device + writeStatement.execute( + "insert into root.t1.wf01.wt02(time, (status, temperature)) values (6000, (false, 22))"); + writeStatement.close(); + + for (Statement readStatement : readStatements) { + ResultSet rs1 = readStatement.executeQuery("select status from root.t1.wf01.wt01"); + rs1.next(); + Assert.assertTrue(rs1.getBoolean(2)); + rs1.close(); + + ResultSet rs2 = readStatement.executeQuery("select status from root.t1.wf01.wt02"); + rs2.next(); + Assert.assertFalse(rs2.getBoolean(2)); + rs2.close(); + + ResultSet rs3 = readStatement.executeQuery("select * from root.t1.wf01.wt01"); + rs3.next(); + Assert.assertEquals(4000, rs3.getLong(1)); + Assert.assertTrue(rs3.getBoolean(2)); + Assert.assertEquals(17.1, rs3.getFloat(3), 0.1); + + rs3.next(); + Assert.assertEquals(5000, rs3.getLong(1)); + Assert.assertFalse(rs3.getBoolean(2)); + Assert.assertEquals(20.1, rs3.getFloat(3), 0.1); + + rs3.next(); + Assert.assertEquals(6000, rs3.getLong(1)); + Assert.assertTrue(rs3.getBoolean(2)); + Assert.assertEquals(22, rs3.getFloat(3), 0.1); + rs3.close(); + + ResultSet rs4 = readStatement.executeQuery("select * from root.t1.wf01.wt02"); + rs4.next(); + Assert.assertEquals(6000, rs4.getLong(1)); + Assert.assertFalse(rs4.getBoolean(2)); + Assert.assertEquals(22, rs4.getFloat(3), 0.1); + rs4.close(); + } + } }
