This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch add_get_sleep in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 30bd5c41e73af8a109c8ddb93e38481519506ac1 Author: qiaojialin <[email protected]> AuthorDate: Wed Jun 3 13:13:39 2020 +0800 tmp --- .../main/java/org/apache/iotdb/SessionExample.java | 554 +++++++++------------ .../org/apache/iotdb/db/metadata/MManager.java | 74 ++- .../java/org/apache/iotdb/db/metadata/MTree.java | 11 +- .../iotdb/db/metadata/mnode/StorageGroupMNode.java | 4 +- .../iotdb/db/qp/physical/crud/InsertPlan.java | 7 +- 5 files changed, 292 insertions(+), 358 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 01b05e5..c39c70f 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -19,17 +19,14 @@ package org.apache.iotdb; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Random; import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.SessionDataSet; -import org.apache.iotdb.session.SessionDataSet.DataIterator; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.session.pool.SessionDataSetWrapper; +import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.record.Tablet; @@ -37,365 +34,274 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public class SessionExample { - private static Session session; + static SessionPool sessionPool = new SessionPool("127.0.0.1", 6667, "root", "root", 10); - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException { - session = new Session("127.0.0.1", 6667, "root", "root"); - session.open(false); + public static void main(String[] args) { - try { - session.setStorageGroup("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) - throw e; + for (int i = 0; i < 6; i++) { + new Thread(new WriteThread(1)).start(); } - createTimeseries(); - createMultiTimeseries(); - insertRecord(); - insertTablet(); - insertTablets(); - insertRecords(); - nonQuery(); - query(); - queryByIterator(); - deleteData(); - deleteTimeseries(); - session.close(); +// try { +// Thread.sleep(10000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// for (int i = 0; i < 6; i++) { +// new Thread(new ReadLastThread(i)).start(); +// new Thread(new ReadRawDataThread(i)).start(); +// new Thread(new ReadGroupByThread(i)).start(); +// } +// +// try { +// Thread.sleep(10000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// new Thread(new WriteHistThread(1)).start(); + } - private static void createTimeseries() - throws IoTDBConnectionException, StatementExecutionException { + static class WriteThread implements Runnable{ + int device; - if (!session.checkTimeseriesExists("root.sg1.d1.s1")) { - session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, - CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists("root.sg1.d1.s2")) { - session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, - CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists("root.sg1.d1.s3")) { - session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, - CompressionType.SNAPPY); + WriteThread(int device) { + this.device = device; } - // create timeseries with tags and attributes - if (!session.checkTimeseriesExists("root.sg1.d1.s4")) { - Map<String, String> tags = new HashMap<>(); - tags.put("tag1", "v1"); - Map<String, String> attributes = new HashMap<>(); - tags.put("description", "v1"); - session.createTimeseries("root.sg1.d1.s4", TSDataType.INT64, TSEncoding.RLE, - CompressionType.SNAPPY, null, tags, attributes, "temperature"); + @Override + public void run() { + Session session = new Session("127.0.0.1", 6667, "root", "root"); + try { + session.open(); + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } + long time = 1; + Random random = new Random(); + while (true) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + long start = System.currentTimeMillis(); + + time += 100; + String deviceId = "root.sg1.d" + time; + List<String> measurements = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + measurements.add("s" + (time + i)); + } + + List<String> values = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + values.add(random.nextInt() + ""); + } + + try { + session.insertRecord(deviceId, time, measurements, values); + } catch (IoTDBConnectionException | StatementExecutionException e) { + e.printStackTrace(); + } + System.out.println( + Thread.currentThread().getName() + " write 1 cost: " + (System.currentTimeMillis() - start)); + } } } - private static void createMultiTimeseries() - throws IoTDBConnectionException, BatchExecutionException, StatementExecutionException { - - if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session - .checkTimeseriesExists("root.sg1.d2.s2")) { - List<String> paths = new ArrayList<>(); - paths.add("root.sg1.d2.s1"); - paths.add("root.sg1.d2.s2"); - List<TSDataType> tsDataTypes = new ArrayList<>(); - tsDataTypes.add(TSDataType.INT64); - tsDataTypes.add(TSDataType.INT64); - List<TSEncoding> tsEncodings = new ArrayList<>(); - tsEncodings.add(TSEncoding.RLE); - tsEncodings.add(TSEncoding.RLE); - List<CompressionType> compressionTypes = new ArrayList<>(); - compressionTypes.add(CompressionType.SNAPPY); - compressionTypes.add(CompressionType.SNAPPY); - - List<Map<String, String>> tagsList = new ArrayList<>(); - Map<String, String> tags = new HashMap<>(); - tags.put("unit", "kg"); - tagsList.add(tags); - tagsList.add(tags); - - List<Map<String, String>> attributesList = new ArrayList<>(); - Map<String, String> attributes = new HashMap<>(); - attributes.put("minValue", "1"); - attributes.put("maxValue", "100"); - attributesList.add(attributes); - attributesList.add(attributes); - - List<String> alias = new ArrayList<>(); - alias.add("weight1"); - alias.add("weight2"); - - session - .createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, - attributesList, alias); - } - } + static class WriteHistThread implements Runnable{ + int device; - private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = "root.sg1.d1"; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - session.insertRecord(deviceId, time, measurements, types, values); + WriteHistThread(int device) { + this.device = device; } - } - private static void insertStrRecord() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = "root.sg1.d1"; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - - for (long time = 0; time < 10; time++) { - List<String> values = new ArrayList<>(); - values.add("1"); - values.add("2"); - values.add("3"); - session.insertRecord(deviceId, time, measurements, values); + @Override + public void run() { + long time = 86400000000L; + Random random = new Random(); + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + long start = System.currentTimeMillis(); + + int index = random.nextInt(250000); + + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s" + index, TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+1), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+2), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+3), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+4), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+5), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+6), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+7), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+8), TSDataType.FLOAT, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s" + (index+9), TSDataType.FLOAT, TSEncoding.RLE)); + + + Tablet tablet = new Tablet("root.sg1.d1", schemaList, 50000); + + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + + for (int num = 0; num < 50000; num++) { + int row = tablet.rowSize++; + timestamps[row] = time; + time += 5000; + for (int i = 0; i < 10; i++) { + float[] sensor = (float[]) values[i]; + sensor[row] = random.nextFloat(); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + try { + sessionPool.insertTablet(tablet, true); + } catch (IoTDBConnectionException | BatchExecutionException e) { + e.printStackTrace(); + } + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + try { + sessionPool.insertTablet(tablet); + } catch (IoTDBConnectionException | BatchExecutionException e) { + e.printStackTrace(); + } + tablet.reset(); + } + System.out.println( + Thread.currentThread().getName() + " write 500000 future point cost: " + (System.currentTimeMillis() - start) + "to s" + index); + } } } - private static void insertRecordInObject() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = "root.sg1.d1"; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L); - } - } + static class ReadLastThread implements Runnable { + int device; - private static void insertRecords() throws IoTDBConnectionException, BatchExecutionException { - String deviceId = "root.sg1.d1"; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<String> deviceIds = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); - List<List<Object>> valuesList = new ArrayList<>(); - List<Long> timestamps = new ArrayList<>(); - List<List<TSDataType>> typesList = new ArrayList<>(); - - for (long time = 0; time < 500; time++) { - List<Object> values = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - deviceIds.add(deviceId); - measurementsList.add(measurements); - valuesList.add(values); - typesList.add(types); - timestamps.add(time); - if (time != 0 && time % 100 == 0) { - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - deviceIds.clear(); - measurementsList.clear(); - valuesList.clear(); - timestamps.clear(); - } + ReadLastThread(int device) { + this.device = device; } - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - } - /** - * insert the data of a device. For each timestamp, the number of measurements is the same. - * - * a Tablet example: - * - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - * - * Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize - */ - private static void insertTablet() throws IoTDBConnectionException, BatchExecutionException { - // The schema of sensors of one device - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE)); - - Tablet tablet = new Tablet("root.sg1.d1", schemaList, 100); - - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); + @Override + public void run() { + SessionDataSetWrapper dataSet = null; + + try { + while (true) { + Thread.sleep(5000); + long start = System.currentTimeMillis(); + + StringBuilder builder = new StringBuilder("select last "); + for (int c = 50000*device; c < 50000*device + 49999; c++) { + builder.append("s").append(c).append(","); + } + + builder.append("s" + ((device+1)*50000-1)); + builder.append(" from root.sg1.d1"); + + dataSet = sessionPool.executeQueryStatement(builder.toString()); + int a = 0; + while (dataSet.hasNext()) { + a++; + dataSet.next(); + } + System.out.println(Thread.currentThread().getName() + " last query: " + a + " cost: " + (System.currentTimeMillis() - start)); + sessionPool.closeResultSet(dataSet); + } + } catch (Exception e) { + e.printStackTrace(); } - } - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); } } - private static void insertTablets() throws IoTDBConnectionException, BatchExecutionException { - // The schema of sensors of one device - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE)); - - Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 100); - Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100); - Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100); - - Map<String, Tablet> tabletMap = new HashMap<>(); - tabletMap.put("root.sg1.d1", tablet1); - tabletMap.put("root.sg1.d2", tablet2); - tabletMap.put("root.sg1.d3", tablet3); - - long[] timestamps1 = tablet1.timestamps; - Object[] values1 = tablet1.values; - long[] timestamps2 = tablet2.timestamps; - Object[] values2 = tablet2.values; - long[] timestamps3 = tablet3.timestamps; - Object[] values3 = tablet3.values; - - for (long time = 0; time < 100; time++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - timestamps1[row1] = time; - timestamps2[row2] = time; - timestamps3[row3] = time; - for (int i = 0; i < 3; i++) { - long[] sensor1 = (long[]) values1[i]; - sensor1[row1] = i; - long[] sensor2 = (long[]) values2[i]; - sensor2[row2] = i; - long[] sensor3 = (long[]) values3[i]; - sensor3[row3] = i; - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); + static class ReadRawDataThread implements Runnable { + int device; - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } + ReadRawDataThread(int device) { + this.device = device; } - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } + @Override + public void run() { + SessionDataSetWrapper dataSet = null; + + Random random = new Random(); + long time = 86400000; + try { + while (true) { + Thread.sleep(5000); + long start = System.currentTimeMillis(); + + StringBuilder builder = new StringBuilder("select "); + + time += 5000; + builder.append("s" + random.nextInt(300000)); + builder.append(" from root.sg1.d1 where time >= " + (time-86400000) + " and time <= " + time); + + dataSet = sessionPool.executeQueryStatement(builder.toString()); + int a = 0; + while (dataSet.hasNext()) { + a++; + dataSet.next(); + } + System.out.println(Thread.currentThread().getName() + " raw data query: " + a + " cost: " + (System.currentTimeMillis() - start)); + sessionPool.closeResultSet(dataSet); + } + } catch (Exception e) { + e.printStackTrace(); + } - private static void deleteData() throws IoTDBConnectionException, StatementExecutionException { - String path = "root.sg1.d1.s1"; - long deleteTime = 99; - session.deleteData(path, deleteTime); + } } - private static void deleteTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add("root.sg1.d1.s1"); - paths.add("root.sg1.d1.s2"); - paths.add("root.sg1.d1.s3"); - session.deleteTimeseries(paths); - } + static class ReadGroupByThread implements Runnable { + int device; - private static void query() throws IoTDBConnectionException, StatementExecutionException { - SessionDataSet dataSet; - dataSet = session.executeQueryStatement("select * from root.sg1.d1"); - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 512 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); + ReadGroupByThread(int device) { + this.device = device; } - dataSet.closeOperationHandle(); - } + @Override + public void run() { + SessionDataSetWrapper dataSet = null; - private static void queryByIterator() - throws IoTDBConnectionException, StatementExecutionException { - SessionDataSet dataSet; - dataSet = session.executeQueryStatement("select * from root.sg1.d1"); - DataIterator iterator = dataSet.iterator(); - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 512 - while (iterator.next()) { - StringBuilder builder = new StringBuilder(); - // get time - builder.append(iterator.getLong(1)).append(","); - // get second column - if (!iterator.isNull(2)) { - builder.append(iterator.getLong(2)).append(","); - } else { - builder.append("null").append(","); - } + Random random = new Random(); + long time = 86400000; + try { + while (true) { + Thread.sleep(5000); + long start = System.currentTimeMillis(); - // get third column - if (!iterator.isNull("root.sg1.d1.s2")) { - builder.append(iterator.getLong("root.sg1.d1.s2")).append(","); - } else { - builder.append("null").append(","); - } + time += 5000; - // get forth column - if (!iterator.isNull(4)) { - builder.append(iterator.getLong(4)).append(","); - } else { - builder.append("null").append(","); - } + StringBuilder builder = new StringBuilder("select "); - // get fifth column - if (!iterator.isNull("root.sg1.d1.s4")) { - builder.append(iterator.getObject("root.sg1.d1.s4")); - } else { - builder.append("null"); - } + builder.append("last_value(s" + random.nextInt(300000) + ")"); + builder.append(" from root.sg1.d1 group by ([" + (time-86400000) + "," + time + "), 5m) fill(int64[PREVIOUSUNTILLAST])"); - System.out.println(builder.toString()); - } + dataSet = sessionPool.executeQueryStatement(builder.toString()); - dataSet.closeOperationHandle(); - } + int a = 0; + while (dataSet.hasNext()) { + a++; + dataSet.next(); + } + System.out.println(Thread.currentThread().getName() + " down sampling query: " + a + " cost: " + (System.currentTimeMillis() - start)); + sessionPool.closeResultSet(dataSet); + } + } catch (Exception e) { + e.printStackTrace(); + } - private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);"); + } } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 20f42fd..791a168 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.metadata; import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter; @@ -110,21 +111,21 @@ public class MManager { writeToLog = false; int cacheSize = config.getmManagerCacheSize(); - mNodeCache = - new RandomDeleteCache<String, MNode>(cacheSize) { - - @Override - public MNode loadObjectByKey(String key) throws CacheException { - lock.readLock().lock(); - try { - return mtree.getNodeByPathWithStorageGroupCheck(key); - } catch (MetadataException e) { - throw new CacheException(e); - } finally { - lock.readLock().unlock(); - } - } - }; +// mNodeCache = +// new RandomDeleteCache<String, MNode>(cacheSize) { +// +// @Override +// public MNode loadObjectByKey(String key) throws CacheException { +// lock.readLock().lock(); +// try { +// return mtree.getNodeByPathWithStorageGroupCheck(key); +// } catch (MetadataException e) { +// throw new CacheException(e); +// } finally { +// lock.readLock().unlock(); +// } +// } +// }; } public static MManager getInstance() { @@ -188,7 +189,7 @@ public class MManager { lock.writeLock().lock(); try { this.mtree = new MTree(); - this.mNodeCache.clear(); +// this.mNodeCache.clear(); this.tagIndex.clear(); this.seriesNumberInStorageGroups.clear(); this.maxSeriesNumberAmongStorageGroup = 0; @@ -288,7 +289,11 @@ public class MManager { } storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, config.getDefaultStorageGroupLevel()); - setStorageGroup(storageGroupName); +// try { + setStorageGroup(storageGroupName); +// } catch (StorageGroupAlreadySetException e1) { +// ignore +// } } // check memory @@ -374,7 +379,7 @@ public class MManager { } } - mNodeCache.clear(); +// mNodeCache.clear(); } try { Set<String> emptyStorageGroups = new HashSet<>(); @@ -440,7 +445,7 @@ public class MManager { String storageGroupName = pair.left; // TODO: delete the path node and all its ancestors - mNodeCache.clear(); +// mNodeCache.clear(); try { IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(-1); } catch (ConfigAdjusterException e) { @@ -508,7 +513,7 @@ public class MManager { for (LeafMNode leafMNode : leafMNodes) { removeFromTagInvertedIndex(leafMNode); } - mNodeCache.clear(); +// mNodeCache.clear(); if (config.isEnableParameterAdapter()) { IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1); @@ -875,6 +880,23 @@ public class MManager { logger.warn("Current thread is interrupted, ignore"); } if (tempCount % 1000 == 0) { + MNode realDevice = null; + try { + realDevice = getNodeByPath(parent.getFullPath()); + MNode realChild = realDevice.getChild(child); + if (realChild == null) { + MNode fullNode = getNodeByPath(parent.getFullPath() + IoTDBConstant.PATH_SEPARATOR + child); + if (fullNode == null) { + logger.warn("realChild is null, qilepale"); + } + } else { + logger.warn("current device: == realDevice ? {}", parent.equals(realDevice)); + logger.warn("current device {} realDevice {}", parent, realDevice); + return realChild; + } + } catch (MetadataException e) { + e.printStackTrace(); + } logger.warn("try to get child {} for {} times from {}", child, tempCount, info); } childNode = parent.getChild(child); @@ -908,9 +930,9 @@ public class MManager { MNode node = null; boolean shouldSetStorageGroup; try { - node = mNodeCache.get(path); + node = mtree.getNodeByPathWithStorageGroupCheck(path); return node; - } catch (CacheException e) { + } catch (MetadataException e) { if (!autoCreateSchema) { throw new PathNotExistException(path); } @@ -924,9 +946,9 @@ public class MManager { lock.writeLock().lock(); try { try { - node = mNodeCache.get(path); + node = mtree.getNodeByPathWithStorageGroupCheck(path); return node; - } catch (CacheException e) { + } catch (MetadataException e) { shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException; } @@ -934,11 +956,11 @@ public class MManager { String storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, sgLevel); setStorageGroup(storageGroupName); } - node = mtree.getDeviceNodeWithAutoCreating(path); + node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel); return node; } catch (StorageGroupAlreadySetException e) { // ignore set storage group concurrently - node = mtree.getDeviceNodeWithAutoCreating(path); + node = mtree.getNodeByPathWithStorageGroupCheck(path); return node; } finally { if (node != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 4551f0a..86134b3 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -138,7 +138,7 @@ public class MTree implements Serializable { * * <p>e.g., get root.sg.d1, get or create all internal nodes and return the node of d1 */ - MNode getDeviceNodeWithAutoCreating(String deviceId) throws MetadataException { + MNode getDeviceNodeWithAutoCreating(String deviceId, int sgLevel) throws MetadataException { String[] nodeNames = MetaUtils.getNodeNames(deviceId); if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) { throw new IllegalPathException(deviceId); @@ -146,7 +146,12 @@ public class MTree implements Serializable { MNode cur = root; for (int i = 1; i < nodeNames.length; i++) { if (!cur.hasChild(nodeNames[i])) { - cur.addChild(nodeNames[i], new InternalMNode(cur, nodeNames[i])); + if (i == sgLevel) { + cur.addChild(nodeNames[i], new StorageGroupMNode(cur, nodeNames[i], + IoTDBDescriptor.getInstance().getConfig().getDefaultTTL())); + } else { + cur.addChild(nodeNames[i], new InternalMNode(cur, nodeNames[i])); + } } cur = cur.getChild(nodeNames[i]); } @@ -205,7 +210,7 @@ public class MTree implements Serializable { } else { StorageGroupMNode storageGroupMNode = new StorageGroupMNode( - cur, nodeNames[i], path, IoTDBDescriptor.getInstance().getConfig().getDefaultTTL()); + cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL()); cur.addChild(nodeNames[i], storageGroupMNode); } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java index a122d95..02c668f 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java @@ -29,10 +29,10 @@ public class StorageGroupMNode extends InternalMNode { private long dataTTL; - public StorageGroupMNode(MNode parent, String name, String fullPath, long dataTTL) { + public StorageGroupMNode(MNode parent, String name, long dataTTL) { super(parent, name); this.dataTTL = dataTTL; - this.fullPath = fullPath; + this.fullPath = getFullPath(); } public long getDataTTL() { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index c28095c..21aee59 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -204,7 +204,6 @@ public class InsertPlan extends PhysicalPlan { failedMeasurements = new ArrayList<>(); } failedMeasurements.add(measurements[index]); - schemas[index] = null; measurements[index] = null; types[index] = null; values[index] = null; @@ -279,8 +278,10 @@ public class InsertPlan extends PhysicalPlan { } } - for (MeasurementSchema schema : schemas) { - schema.serializeTo(stream); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + schemas[i].serializeTo(stream); + } } try {
