This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty-graduate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f3d2b321428d3cf204579d3d90548cd561111113 Author: JackieTien97 <[email protected]> AuthorDate: Wed Mar 2 10:28:52 2022 +0800 change experiment code --- .../main/java/org/apache/iotdb/SessionExample.java | 702 +-------------------- .../iotdb/db/engine/flush/MemTableFlushTask.java | 14 +- 2 files changed, 36 insertions(+), 680 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 34c5279..5c66cbd 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -77,7 +77,7 @@ public class SessionExample { int sensorNum = Integer.parseInt(args[2]); long startTime = System.currentTimeMillis(); if ("aligned".equals(args[0])) { - insertAlignedTablet(totalRowNum, sensorNum); + insertTablet(totalRowNum, sensorNum, true); System.out.println( "Insert aligned " + totalRowNum @@ -85,7 +85,7 @@ public class SessionExample { + (System.currentTimeMillis() - startTime) + "ms."); } else if ("nonAligned".equals(args[0])) { - insertTablet(totalRowNum, sensorNum); + insertTablet(totalRowNum, sensorNum, false); System.out.println( "Insert nonAligned " + totalRowNum @@ -99,307 +99,7 @@ public class SessionExample { session.close(); } - private static void createAndDropContinuousQueries() - throws StatementExecutionException, IoTDBConnectionException { - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq1 " - + "BEGIN SELECT max_value(s1) INTO temperature_max FROM root.sg1.* " - + "GROUP BY time(10s) END"); - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq2 " - + "BEGIN SELECT count(s2) INTO temperature_cnt FROM root.sg1.* " - + "GROUP BY time(10s), level=1 END"); - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq3 " - + "RESAMPLE EVERY 20s FOR 20s " - + "BEGIN SELECT avg(s3) INTO temperature_avg FROM root.sg1.* " - + "GROUP BY time(10s), level=1 END"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq1"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq2"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq3"); - } - - private static void createTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) { - session.createTimeseries( - ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) { - session.createTimeseries( - ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S3)) { - session.createTimeseries( - ROOT_SG1_D1_S3, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - - // create timeseries with tags and attributes - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S4)) { - Map<String, String> tags = new HashMap<>(); - tags.put("tag1", "v1"); - Map<String, String> attributes = new HashMap<>(); - attributes.put("description", "v1"); - session.createTimeseries( - ROOT_SG1_D1_S4, - TSDataType.INT64, - TSEncoding.RLE, - CompressionType.SNAPPY, - null, - tags, - attributes, - "temperature"); - } - - // create timeseries with SDT property, SDT will take place when flushing - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S5)) { - // COMPDEV is required - // COMPMAXTIME and COMPMINTIME are optional and their unit is ms - Map<String, String> props = new HashMap<>(); - props.put("LOSS", "sdt"); - props.put("COMPDEV", "0.01"); - props.put("COMPMINTIME", "2"); - props.put("COMPMAXTIME", "10"); - session.createTimeseries( - ROOT_SG1_D1_S5, - TSDataType.INT64, - TSEncoding.RLE, - CompressionType.SNAPPY, - props, - null, - null, - null); - } - } - - private static void createMultiTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - if (!session.checkTimeseriesExists("root.sg1.d2.s1") - && !session.checkTimeseriesExists("root.sg1.d2.s2")) { - List<String> paths = new ArrayList<>(); - paths.add("root.sg1.d2.s1"); - paths.add("root.sg1.d2.s2"); - List<TSDataType> tsDataTypes = new ArrayList<>(); - tsDataTypes.add(TSDataType.INT64); - tsDataTypes.add(TSDataType.INT64); - List<TSEncoding> tsEncodings = new ArrayList<>(); - tsEncodings.add(TSEncoding.RLE); - tsEncodings.add(TSEncoding.RLE); - List<CompressionType> compressionTypes = new ArrayList<>(); - compressionTypes.add(CompressionType.SNAPPY); - compressionTypes.add(CompressionType.SNAPPY); - - List<Map<String, String>> tagsList = new ArrayList<>(); - Map<String, String> tags = new HashMap<>(); - tags.put("unit", "kg"); - tagsList.add(tags); - tagsList.add(tags); - - List<Map<String, String>> attributesList = new ArrayList<>(); - Map<String, String> attributes = new HashMap<>(); - attributes.put("minValue", "1"); - attributes.put("maxValue", "100"); - attributesList.add(attributes); - attributesList.add(attributes); - - List<String> alias = new ArrayList<>(); - alias.add("weight1"); - alias.add("weight2"); - - session.createMultiTimeseries( - paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, attributesList, alias); - } - } - - private static void createTemplate() - throws IoTDBConnectionException, StatementExecutionException, IOException { - - Template template = new Template("template1", false); - MeasurementNode mNodeS1 = - new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS2 = - new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS3 = - new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - - template.addToTemplate(mNodeS1); - template.addToTemplate(mNodeS2); - template.addToTemplate(mNodeS3); - - session.createSchemaTemplate(template); - session.setSchemaTemplate("template1", "root.sg1"); - } - - private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - session.insertRecord(deviceId, time, measurements, types, values); - } - } - - private static void insertRecord4Redirect() - throws IoTDBConnectionException, StatementExecutionException { - for (int i = 0; i < 6; i++) { - for (int j = 0; j < 2; j++) { - String deviceId = "root.redirect" + i + ".d" + j; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<TSDataType> types = new ArrayList<>(); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 5; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L + time); - values.add(2L + time); - values.add(3L + time); - session.insertRecord(deviceId, time, measurements, types, values); - } - } - } - } - - private static void insertStrRecord() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - - for (long time = 0; time < 10; time++) { - List<String> values = new ArrayList<>(); - values.add("1"); - values.add("2"); - values.add("3"); - session.insertRecord(deviceId, time, measurements, values); - } - } - - private static void insertRecordInObject() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L); - } - } - - private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<String> deviceIds = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); - List<List<Object>> valuesList = new ArrayList<>(); - List<Long> timestamps = new ArrayList<>(); - List<List<TSDataType>> typesList = new ArrayList<>(); - - for (long time = 0; time < 500; time++) { - List<Object> values = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - deviceIds.add(deviceId); - measurementsList.add(measurements); - valuesList.add(values); - typesList.add(types); - timestamps.add(time); - if (time != 0 && time % 100 == 0) { - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - deviceIds.clear(); - measurementsList.clear(); - valuesList.clear(); - typesList.clear(); - timestamps.clear(); - } - } - - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - } - - /** - * insert the data of a device. For each timestamp, the number of measurements is the same. - * - * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize - */ - private static void insertTablet(long totalRowNum, int sensorNum) - throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - for (int i = 0; i < sensorNum; i++) { - schemaList.add(new MeasurementSchema("s" + i, TSDataType.FLOAT)); - } - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 10000); - - // Method 1 to add tablet data - long timestamp = 1646134492000L; - Random random = new Random(123456); - for (long row = 0; row < totalRowNum; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < sensorNum; s++) { - float value = -100.0f + 200.0f * random.nextFloat(); - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - System.out.println("already insert: " + row + " rows."); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertAlignedTablet(long totalRowNum, int sensorNum) + private static void insertTablet(long totalRowNum, int sensorNum, boolean isAligned) throws IoTDBConnectionException, StatementExecutionException { /* * A Tablet example: @@ -416,228 +116,41 @@ public class SessionExample { schemaList.add(new MeasurementSchema("s" + i, TSDataType.FLOAT)); } - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 10000); + Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 1000); long timestamp = 1646134492000L; Random random = new Random(123456); - for (long row = 0; row < totalRowNum; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < sensorNum; s++) { - float value = -100.0f + 200.0f * random.nextFloat(); - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - System.out.println("already insert: " + row + " rows."); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertAlignedTablet(tablet); - tablet.reset(); - } - } - - private static void insertTabletWithNullValues() - throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, null, 1, 1 - * 2, 2, null, 2 - * 3, 3, 3, null - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100); - - // Method 1 to add tablet data - tablet.bitMaps = new BitMap[schemaList.size()]; - for (int s = 0; s < 3; s++) { - tablet.bitMaps[s] = new BitMap(tablet.getMaxRowNumber()); - } - - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < 3; s++) { - long value = new Random().nextLong(); - // mark null value - if (row % 3 == s) { - tablet.bitMaps[s].mark((int) row); + for (long row = 0; row < totalRowNum; row += 10_000) { + for (int i = 9_999; i >= 0; i--) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + i); + for (int s = 0; s < sensorNum; s++) { + float value = -100.0f + 200.0f * random.nextFloat(); + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); } - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - - // Method 2 to add tablet data - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - BitMap[] bitMaps = new BitMap[schemaList.size()]; - for (int s = 0; s < 3; s++) { - bitMaps[s] = new BitMap(tablet.getMaxRowNumber()); - } - tablet.bitMaps = bitMaps; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - // mark null value - if (row % 3 == i) { - bitMaps[i].mark(row); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + if (isAligned) { + session.insertAlignedTablet(tablet, false); + } else { + session.insertTablet(tablet, false); + } + tablet.reset(); } - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); } + System.out.println("already insert: " + row + " rows."); + timestamp += 10_000; } if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet1 = new Tablet(ROOT_SG1_D1, schemaList, 100); - Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100); - Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100); - - Map<String, Tablet> tabletMap = new HashMap<>(); - tabletMap.put(ROOT_SG1_D1, tablet1); - tabletMap.put("root.sg1.d2", tablet2); - tabletMap.put("root.sg1.d3", tablet3); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - tablet1.addTimestamp(row1, timestamp); - tablet2.addTimestamp(row2, timestamp); - tablet3.addTimestamp(row3, timestamp); - for (int i = 0; i < 3; i++) { - long value = new Random().nextLong(); - tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value); - tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value); - tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value); - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - timestamp++; - } - - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - - // Method 2 to add tablet data - long[] timestamps1 = tablet1.timestamps; - Object[] values1 = tablet1.values; - long[] timestamps2 = tablet2.timestamps; - Object[] values2 = tablet2.values; - long[] timestamps3 = tablet3.timestamps; - Object[] values3 = tablet3.values; - - for (long time = 0; time < 100; time++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - timestamps1[row1] = time; - timestamps2[row2] = time; - timestamps3[row3] = time; - for (int i = 0; i < 3; i++) { - long[] sensor1 = (long[]) values1[i]; - sensor1[row1] = i; - long[] sensor2 = (long[]) values2[i]; - sensor2[row2] = i; - long[] sensor3 = (long[]) values3[i]; - sensor3[row3] = i; - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); - - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - private static void selectInto() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement( - "select s1, s2, s3 into into_s1, into_s2, into_s3 from root.sg1.d1"); - - try (SessionDataSet dataSet = - session.executeQueryStatement("select into_s1, into_s2, into_s3 from root.sg1.d1")) { - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); + if (isAligned) { + session.insertAlignedTablet(tablet, false); + } else { + session.insertTablet(tablet, false); } + tablet.reset(); } } - private static void deleteData() throws IoTDBConnectionException, StatementExecutionException { - String path = ROOT_SG1_D1_S1; - long deleteTime = 99; - session.deleteData(path, deleteTime); - } - - private static void deleteTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - session.deleteTimeseries(paths); - } - private static void query(String sql) throws IoTDBConnectionException, StatementExecutionException { long startTime = System.currentTimeMillis(); @@ -649,169 +162,4 @@ public class SessionExample { } System.out.println("cost: " + (System.currentTimeMillis() - startTime) + "ms"); } - - private static void query4Redirect() - throws IoTDBConnectionException, StatementExecutionException { - String selectPrefix = "select * from root.redirect"; - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix + i + ".d1 where time >= 1 and time < 10")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix - + i - + ".d1 where time >= 1 and time < 10 and root.redirect" - + i - + ".d1.s1 > 1")) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - } - - private static void queryWithTimeout() - throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = - session.executeQueryStatement("select * from root.sg1.d1", 2000)) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - long startTime = 10L; - long endTime = 200L; - - try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime)) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void queryByIterator() - throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1")) { - - DataIterator iterator = dataSet.iterator(); - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (iterator.next()) { - StringBuilder builder = new StringBuilder(); - // get time - builder.append(iterator.getLong(1)).append(","); - // get second column - if (!iterator.isNull(2)) { - builder.append(iterator.getLong(2)).append(","); - } else { - builder.append("null").append(","); - } - - // get third column - if (!iterator.isNull(ROOT_SG1_D1_S2)) { - builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(","); - } else { - builder.append("null").append(","); - } - - // get forth column - if (!iterator.isNull(4)) { - builder.append(iterator.getLong(4)).append(","); - } else { - builder.append("null").append(","); - } - - // get fifth column - if (!iterator.isNull(ROOT_SG1_D1_S4)) { - builder.append(iterator.getObject(ROOT_SG1_D1_S4)); - } else { - builder.append("null"); - } - - System.out.println(builder); - } - } - } - - private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1)"); - } - - private static void setTimeout() throws StatementExecutionException { - Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000); - tempSession.setQueryTimeout(60000); - } - - private static void createClusterSession() throws IoTDBConnectionException { - ArrayList<String> nodeList = new ArrayList<>(); - nodeList.add("127.0.0.1:6669"); - nodeList.add("127.0.0.1:6667"); - nodeList.add("127.0.0.1:6668"); - Session clusterSession = new Session(nodeList, "root", "root"); - clusterSession.open(); - clusterSession.close(); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 5c022b4..b1d0223 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * flush task to flush one memtable using a pipeline model to flush, which is sort memtable -> @@ -71,7 +72,12 @@ public class MemTableFlushTask { private volatile long memSerializeTime = 0L; private volatile long ioTime = 0L; - private long totalCostTime = 0L; + private static final AtomicLong totalCostTime = new AtomicLong(0); + + private static final AtomicLong totalEncodingTime = new AtomicLong(0); + + private static final AtomicLong totalIOTime = new AtomicLong(0); + /** * @param memTable the memTable to flush @@ -175,8 +181,10 @@ public class MemTableFlushTask { memTable, costTime); - totalCostTime += costTime; - LOGGER.info("Cumulative flush cost time: {}", totalCostTime); + totalCostTime.addAndGet(costTime); + totalEncodingTime.addAndGet(memSerializeTime); + totalIOTime.addAndGet(ioTime); + LOGGER.info("Cumulative sort cost time: {}, encoding cost time: {}, io cost time: {}, flush cost time: {}", totalCostTime.get() - totalEncodingTime.get() - totalIOTime.get(), totalEncodingTime.get(), totalIOTime.get(), totalCostTime.get()); } /** encoding task (second task of pipeline) */
