This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch opt_py_dataset in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f03ad248a47b017c6f25cd532945cd5f02a91953 Author: HTHou <[email protected]> AuthorDate: Tue Sep 26 18:59:59 2023 +0800 opt_python_query --- .../main/java/org/apache/iotdb/SessionExample.java | 879 +-------------------- iotdb-client/client-py/SessionExample.py | 403 +--------- iotdb-client/client-py/iotdb/utils/Field.py | 93 +-- .../client-py/iotdb/utils/IoTDBRpcDataSet.py | 88 ++- .../client-py/iotdb/utils/SessionDataSet.py | 66 +- 5 files changed, 143 insertions(+), 1386 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 60bf282ed8f..bb750fd8af6 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -1,49 +1,19 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - package org.apache.iotdb; -import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.isession.SessionDataSet.DataIterator; -import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.template.MeasurementNode; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Random; @SuppressWarnings({"squid:S106", "squid:S1144", "squid:S125"}) @@ -51,15 +21,8 @@ public class SessionExample { private static Session session; private static Session sessionEnableRedirect; - private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; - private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2"; - private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3"; - private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4"; - private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5"; - private static final String ROOT_SG1_D1 = "root.sg1.d1"; - private static final String ROOT_SG1 = "root.sg1"; private static final String LOCAL_HOST = "127.0.0.1"; - public static final String SELECT_D1 = "select * from root.sg1.d1"; + private static final String ROOT_SG1_D1 = "root.sg.d"; private static Random random = new Random(); @@ -74,838 +37,18 @@ public class SessionExample { .version(Version.V_1_0) .build(); session.open(false); - - // set session fetchSize - session.setFetchSize(10000); - - try { - session.createDatabase("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - throw e; - } - } - - // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - insertRecord(); - insertTablet(); - // insertTabletWithNullValues(); - // insertTablets(); - // insertRecords(); - // insertText(); - // selectInto(); - // createAndDropContinuousQueries(); - // nonQuery(); - query(); - // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); - // queryByIterator(); - // deleteData(); - // deleteTimeseries(); - // setTimeout(); - - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); - session.close(); - } - - private static void createAndDropContinuousQueries() - throws StatementExecutionException, IoTDBConnectionException { - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq1 " - + "BEGIN SELECT max_value(s1) INTO temperature_max FROM root.sg1.* " - + "GROUP BY time(10s) END"); - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq2 " - + "BEGIN SELECT count(s2) INTO temperature_cnt FROM root.sg1.* " - + "GROUP BY time(10s), level=1 END"); - session.executeNonQueryStatement( - "CREATE CONTINUOUS QUERY cq3 " - + "RESAMPLE EVERY 20s FOR 20s " - + "BEGIN SELECT avg(s3) INTO temperature_avg FROM root.sg1.* " - + "GROUP BY time(10s), level=1 END"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq1"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq2"); - session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq3"); - } - - private static void createTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) { - session.createTimeseries( - ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S2)) { - session.createTimeseries( - ROOT_SG1_D1_S2, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S3)) { - session.createTimeseries( - ROOT_SG1_D1_S3, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - } - - // create timeseries with tags and attributes - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S4)) { - Map<String, String> tags = new HashMap<>(); - tags.put("tag1", "v1"); - Map<String, String> attributes = new HashMap<>(); - attributes.put("description", "v1"); - session.createTimeseries( - ROOT_SG1_D1_S4, - TSDataType.INT64, - TSEncoding.RLE, - CompressionType.SNAPPY, - null, - tags, - attributes, - "temperature"); - } - - // create timeseries with SDT property, SDT will take place when flushing - if (!session.checkTimeseriesExists(ROOT_SG1_D1_S5)) { - // COMPDEV is required - // COMPMAXTIME and COMPMINTIME are optional and their unit is ms - Map<String, String> props = new HashMap<>(); - props.put("LOSS", "sdt"); - props.put("COMPDEV", "0.01"); - props.put("COMPMINTIME", "2"); - props.put("COMPMAXTIME", "10"); - session.createTimeseries( - ROOT_SG1_D1_S5, - TSDataType.INT64, - TSEncoding.RLE, - CompressionType.SNAPPY, - props, - null, - null, - null); - } - } - - private static void createMultiTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - - if (!session.checkTimeseriesExists("root.sg1.d2.s1") - && !session.checkTimeseriesExists("root.sg1.d2.s2")) { - List<String> paths = new ArrayList<>(); - paths.add("root.sg1.d2.s1"); - paths.add("root.sg1.d2.s2"); - List<TSDataType> tsDataTypes = new ArrayList<>(); - tsDataTypes.add(TSDataType.INT64); - tsDataTypes.add(TSDataType.INT64); - List<TSEncoding> tsEncodings = new ArrayList<>(); - tsEncodings.add(TSEncoding.RLE); - tsEncodings.add(TSEncoding.RLE); - List<CompressionType> compressionTypes = new ArrayList<>(); - compressionTypes.add(CompressionType.SNAPPY); - compressionTypes.add(CompressionType.SNAPPY); - - List<Map<String, String>> tagsList = new ArrayList<>(); - Map<String, String> tags = new HashMap<>(); - tags.put("unit", "kg"); - tagsList.add(tags); - tagsList.add(tags); - - List<Map<String, String>> attributesList = new ArrayList<>(); - Map<String, String> attributes = new HashMap<>(); - attributes.put("minValue", "1"); - attributes.put("maxValue", "100"); - attributesList.add(attributes); - attributesList.add(attributes); - - List<String> alias = new ArrayList<>(); - alias.add("weight1"); - alias.add("weight2"); - - session.createMultiTimeseries( - paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, attributesList, alias); - } - } - - private static void createTemplate() - throws IoTDBConnectionException, StatementExecutionException, IOException { - - Template template = new Template("template1", false); - MeasurementNode mNodeS1 = - new MeasurementNode("s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS2 = - new MeasurementNode("s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - MeasurementNode mNodeS3 = - new MeasurementNode("s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); - - template.addToTemplate(mNodeS1); - template.addToTemplate(mNodeS2); - template.addToTemplate(mNodeS3); - - session.createSchemaTemplate(template); - session.setSchemaTemplate("template1", "root.sg1"); - } - - private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - session.insertRecord(deviceId, time, measurements, types, values); - } - } - - private static void insertRecord4Redirect() - throws IoTDBConnectionException, StatementExecutionException { - for (int i = 0; i < 6; i++) { - for (int j = 0; j < 2; j++) { - String deviceId = "root.redirect" + i + ".d" + j; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<TSDataType> types = new ArrayList<>(); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 5; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L + time); - values.add(2L + time); - values.add(3L + time); - session.insertRecord(deviceId, time, measurements, types, values); - } - } - } - } - - private static void insertStrRecord() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - - for (long time = 0; time < 10; time++) { - List<String> values = new ArrayList<>(); - values.add("1"); - values.add("2"); - values.add("3"); - session.insertRecord(deviceId, time, measurements, values); - } - } - - private static void insertRecordInObject() - throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - for (long time = 0; time < 100; time++) { - session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L); - } - } - - private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; - List<String> measurements = new ArrayList<>(); - measurements.add("s1"); - measurements.add("s2"); - measurements.add("s3"); - List<String> deviceIds = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); - List<List<Object>> valuesList = new ArrayList<>(); - List<Long> timestamps = new ArrayList<>(); - List<List<TSDataType>> typesList = new ArrayList<>(); - - for (long time = 0; time < 500; time++) { - List<Object> values = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - deviceIds.add(deviceId); - measurementsList.add(measurements); - valuesList.add(values); - typesList.add(types); - timestamps.add(time); - if (time != 0 && time % 100 == 0) { - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - deviceIds.clear(); - measurementsList.clear(); - valuesList.clear(); - typesList.clear(); - timestamps.clear(); - } - } - - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - } - - /** - * insert the data of a device. For each timestamp, the number of measurements is the same. - * - * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize - */ - private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - - for (long row = 0; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < 3; s++) { - long value = random.nextLong(); - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - - // Method 2 to add tablet data - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertTabletWithNullValues() - throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, null, 1, 1 - * 2, 2, null, 2 - * 3, 3, 3, null - */ - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100); - - // Method 1 to add tablet data - insertTablet1(schemaList, tablet); - - // Method 2 to add tablet data - insertTablet2(schemaList, tablet); - } - - private static void insertTablet1(List<MeasurementSchema> schemaList, Tablet tablet) - throws IoTDBConnectionException, StatementExecutionException { - tablet.initBitMaps(); - - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, timestamp); - for (int s = 0; s < 3; s++) { - long value = random.nextLong(); - // mark null value - if (row % 3 == s) { - tablet.bitMaps[s].mark((int) row); - } - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertTablet2(List<MeasurementSchema> schemaList, Tablet tablet) - throws IoTDBConnectionException, StatementExecutionException { - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - BitMap[] bitMaps = new BitMap[schemaList.size()]; - for (int s = 0; s < 3; s++) { - bitMaps[s] = new BitMap(tablet.getMaxRowNumber()); - } - tablet.bitMaps = bitMaps; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - for (int i = 0; i < 3; i++) { - long[] sensor = (long[]) values[i]; - // mark null value - if (row % 3 == i) { - bitMaps[i].mark(row); - } - sensor[row] = i; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet); - tablet.reset(); - } - } - - private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); - - Tablet tablet1 = new Tablet(ROOT_SG1_D1, schemaList, 100); - Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100); - Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100); - - Map<String, Tablet> tabletMap = new HashMap<>(); - tabletMap.put(ROOT_SG1_D1, tablet1); - tabletMap.put("root.sg1.d2", tablet2); - tabletMap.put("root.sg1.d3", tablet3); - - // Method 1 to add tablet data - long timestamp = System.currentTimeMillis(); - for (long row = 0; row < 100; row++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - tablet1.addTimestamp(row1, timestamp); - tablet2.addTimestamp(row2, timestamp); - tablet3.addTimestamp(row3, timestamp); - for (int i = 0; i < 3; i++) { - long value = random.nextLong(); - tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value); - tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value); - tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value); - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - timestamp++; - } - - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - - // Method 2 to add tablet data - long[] timestamps1 = tablet1.timestamps; - Object[] values1 = tablet1.values; - long[] timestamps2 = tablet2.timestamps; - Object[] values2 = tablet2.values; - long[] timestamps3 = tablet3.timestamps; - Object[] values3 = tablet3.values; - - for (long time = 0; time < 100; time++) { - int row1 = tablet1.rowSize++; - int row2 = tablet2.rowSize++; - int row3 = tablet3.rowSize++; - timestamps1[row1] = time; - timestamps2[row2] = time; - timestamps3[row3] = time; - for (int i = 0; i < 3; i++) { - long[] sensor1 = (long[]) values1[i]; - sensor1[row1] = i; - long[] sensor2 = (long[]) values2[i]; - sensor2[row2] = i; - long[] sensor3 = (long[]) values3[i]; - sensor3[row3] = i; - } - if (tablet1.rowSize == tablet1.getMaxRowNumber()) { - session.insertTablets(tabletMap, true); - - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - if (tablet1.rowSize != 0) { - session.insertTablets(tabletMap, true); - tablet1.reset(); - tablet2.reset(); - tablet3.reset(); - } - } - - /** - * This example shows how to insert data of TSDataType.TEXT. You can use the session interface to - * write data of String type or Binary type. - */ - private static void insertText() throws IoTDBConnectionException, StatementExecutionException { - String device = "root.sg1.text"; - // the first data is String type and the second data is Binary type - List<Object> datas = Arrays.asList("String", new Binary("Binary")); - // insertRecord example - for (int i = 0; i < datas.size(); i++) { - // write data of String type or Binary type - session.insertRecord( - device, - i, - Collections.singletonList("s1"), - Collections.singletonList(TSDataType.TEXT), - datas.get(i)); - } - - // insertTablet example - List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT)); - Tablet tablet = new Tablet(device, schemaList, 100); - for (int i = 0; i < datas.size(); i++) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, i); - // write data of String type or Binary type - tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, datas.get(i)); - } - session.insertTablet(tablet); - try (SessionDataSet dataSet = session.executeQueryStatement("select s1, s2 from " + device)) { - System.out.println(dataSet.getColumnNames()); + long start = System.currentTimeMillis(); + try (SessionDataSet dataSet = session.executeQueryStatement("select ** from root")) { + dataSet.getColumnNames(); + dataSet.getColumnTypes(); +// System.out.println(dataSet.getColumnNames()); +// System.out.println(dataSet.getColumnTypes()); while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void selectInto() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement( - "select s1, s2, s3 into into_s1, into_s2, into_s3 from root.sg1.d1"); - - try (SessionDataSet dataSet = - session.executeQueryStatement("select into_s1, into_s2, into_s3 from root.sg1.d1")) { - System.out.println(dataSet.getColumnNames()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void deleteData() throws IoTDBConnectionException, StatementExecutionException { - String path = ROOT_SG1_D1_S1; - long deleteTime = 99; - session.deleteData(path, deleteTime); - } - - private static void deleteTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - session.deleteTimeseries(paths); - } - - private static void query() throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = session.executeQueryStatement(SELECT_D1)) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void query4Redirect() - throws IoTDBConnectionException, StatementExecutionException { - String selectPrefix = "select * from root.redirect"; - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix + i + ".d1 where time >= 1 and time < 10")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device")) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } + dataSet.next(); + // System.out.println(dataSet.next()); } } - - for (int i = 0; i < 6; i++) { - try (SessionDataSet dataSet = - sessionEnableRedirect.executeQueryStatement( - selectPrefix - + i - + ".d1 where time >= 1 and time < 10 and root.redirect" - + i - + ".d1.s1 > 1")) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - } - - private static void queryWithTimeout() - throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = session.executeQueryStatement(SELECT_D1, 2000)) { - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - long startTime = 10L; - long endTime = 200L; - long timeOut = 60000; - - try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime, timeOut)) { - - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - - private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3, 60000)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void fastLastDataQueryForOneDevice() - throws IoTDBConnectionException, StatementExecutionException { - System.out.println("-------fastLastQuery------"); - List<String> paths = new ArrayList<>(); - paths.add("s1"); - paths.add("s2"); - paths.add("s3"); - try (SessionDataSet sessionDataSet = - sessionEnableRedirect.executeLastDataQueryForOneDevice( - ROOT_SG1, ROOT_SG1_D1, paths, true)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void aggregationQuery() - throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - - List<TAggregationType> aggregations = new ArrayList<>(); - aggregations.add(TAggregationType.COUNT); - aggregations.add(TAggregationType.SUM); - aggregations.add(TAggregationType.MAX_VALUE); - try (SessionDataSet sessionDataSet = session.executeAggregationQuery(paths, aggregations)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void groupByQuery() throws IoTDBConnectionException, StatementExecutionException { - List<String> paths = new ArrayList<>(); - paths.add(ROOT_SG1_D1_S1); - paths.add(ROOT_SG1_D1_S2); - paths.add(ROOT_SG1_D1_S3); - - List<TAggregationType> aggregations = new ArrayList<>(); - aggregations.add(TAggregationType.COUNT); - aggregations.add(TAggregationType.SUM); - aggregations.add(TAggregationType.MAX_VALUE); - try (SessionDataSet sessionDataSet = - session.executeAggregationQuery(paths, aggregations, 0, 100, 10, 20)) { - System.out.println(sessionDataSet.getColumnNames()); - sessionDataSet.setFetchSize(1024); - while (sessionDataSet.hasNext()) { - System.out.println(sessionDataSet.next()); - } - } - } - - private static void queryByIterator() - throws IoTDBConnectionException, StatementExecutionException { - try (SessionDataSet dataSet = session.executeQueryStatement(SELECT_D1)) { - - DataIterator iterator = dataSet.iterator(); - System.out.println(dataSet.getColumnNames()); - dataSet.setFetchSize(1024); // default is 10000 - while (iterator.next()) { - StringBuilder builder = new StringBuilder(); - // get time - builder.append(iterator.getLong(1)).append(","); - // get second column - if (!iterator.isNull(2)) { - builder.append(iterator.getLong(2)).append(","); - } else { - builder.append("null").append(","); - } - - // get third column - if (!iterator.isNull(ROOT_SG1_D1_S2)) { - builder.append(iterator.getLong(ROOT_SG1_D1_S2)).append(","); - } else { - builder.append("null").append(","); - } - - // get forth column - if (!iterator.isNull(4)) { - builder.append(iterator.getLong(4)).append(","); - } else { - builder.append("null").append(","); - } - - // get fifth column - if (!iterator.isNull(ROOT_SG1_D1_S4)) { - builder.append(iterator.getObject(ROOT_SG1_D1_S4)); - } else { - builder.append("null"); - } - - System.out.println(builder); - } - } - } - - private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException { - session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1)"); - } - - private static void setTimeout() throws IoTDBConnectionException { - try (Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000)) { - tempSession.setQueryTimeout(60000); - } + System.out.println("query test costs: " + (System.currentTimeMillis() - start) + "ms"); + session.close(); } } diff --git a/iotdb-client/client-py/SessionExample.py b/iotdb-client/client-py/SessionExample.py index d7223b16146..79680acd68c 100644 --- a/iotdb-client/client-py/SessionExample.py +++ b/iotdb-client/client-py/SessionExample.py @@ -1,30 +1,8 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Uncomment the following line to use apache-iotdb module installed by pip3 +from datetime import datetime import numpy as np from iotdb.Session import Session -from iotdb.template.MeasurementNode import MeasurementNode -from iotdb.template.Template import Template -from iotdb.utils.BitMap import BitMap from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor -from iotdb.utils.Tablet import Tablet from iotdb.utils.NumpyTablet import NumpyTablet # creating session connection. @@ -32,366 +10,27 @@ ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "root" -# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8", enable_redirection=True) -session = Session.init_from_node_urls( - node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], - user="root", - password="root", +session = Session( + ip, + port_, + username_, + password_, fetch_size=1024, zone_id="UTC+8", - enable_redirection=True, -) -session.open(False) - -# create and delete databases -session.set_storage_group("root.sg_test_01") -session.set_storage_group("root.sg_test_02") -session.set_storage_group("root.sg_test_03") -session.set_storage_group("root.sg_test_04") -session.delete_storage_group("root.sg_test_02") -session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) - -# setting time series. -session.create_time_series( - "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY -) -session.create_time_series( - "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY -) -session.create_time_series( - "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY -) -session.create_time_series( - "root.sg_test_01.d_02.s_01", - TSDataType.BOOLEAN, - TSEncoding.PLAIN, - Compressor.SNAPPY, - None, - {"tag1": "v1"}, - {"description": "v1"}, - "temperature", -) - -# setting multiple time series once. -ts_path_lst_ = [ - "root.sg_test_01.d_01.s_04", - "root.sg_test_01.d_01.s_05", - "root.sg_test_01.d_01.s_06", - "root.sg_test_01.d_01.s_07", - "root.sg_test_01.d_01.s_08", - "root.sg_test_01.d_01.s_09", -] -data_type_lst_ = [ - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, -] -encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] -compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] -session.create_multi_time_series( - ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_ -) - -ts_path_lst_ = [ - "root.sg_test_01.d_02.s_04", - "root.sg_test_01.d_02.s_05", - "root.sg_test_01.d_02.s_06", - "root.sg_test_01.d_02.s_07", - "root.sg_test_01.d_02.s_08", - "root.sg_test_01.d_02.s_09", -] -data_type_lst_ = [ - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, -] -encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] -compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] -tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))] -attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))] -session.create_multi_time_series( - ts_path_lst_, - data_type_lst_, - encoding_lst_, - compressor_lst_, - None, - tags_lst_, - attributes_lst_, - None, -) - -# delete time series -session.delete_time_series( - [ - "root.sg_test_01.d_01.s_07", - "root.sg_test_01.d_01.s_08", - "root.sg_test_01.d_01.s_09", - ] + enable_redirection=False, ) - -# checking time series -print( - "s_07 expecting False, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_01.s_07"), -) -print( - "s_03 expecting True, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_01.s_03"), -) -print( - "d_02.s_01 expecting True, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_02.s_01"), -) -print( - "d_02.s_06 expecting True, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_02.s_06"), -) - -# insert one record into the database. -measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] -values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] -data_types_ = [ - TSDataType.BOOLEAN, - TSDataType.INT32, - TSDataType.INT64, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, -] -session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) - -# insert multiple records into database -measurements_list_ = [ - ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], - ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], -] -values_list_ = [ - [False, 22, 33, 4.4, 55.1, "test_records01"], - [True, 77, 88, 1.25, 8.125, bytes("test_records02", "utf-8")], -] -data_type_list_ = [data_types_, data_types_] -device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] -session.insert_records( - device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ -) - -# insert one tablet into the database. -values_ = [ - [False, 10, 11, 1.1, 10011.1, "test01"], - [True, 100, 11111, 1.25, 101.0, "test02"], - [False, 100, 1, 188.1, 688.25, "test03"], - [True, 0, 0, 0, 6.25, "test04"], -] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. -timestamps_ = [4, 5, 6, 7] -tablet_ = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ -) -session.insert_tablet(tablet_) - -# insert one numpy tablet into the database. -np_values_ = [ - np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), - np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), - np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), - np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), - np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), - np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), -] -np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype()) -np_tablet_ = NumpyTablet( - "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_ -) -session.insert_tablet(np_tablet_) - -# insert one unsorted numpy tablet into the database. -np_values_unsorted = [ - np.array([False, False, False, True, True], np.dtype(">?")), - np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")), - np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")), - np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")), - np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")), - np.array(["test09", "test08", "test07", "test06", "test05"]), -] -np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8")) -np_tablet_unsorted = NumpyTablet( - "root.sg_test_01.d_02", - measurements_, - data_types_, - np_values_unsorted, - np_timestamps_unsorted, -) - -# insert one numpy tablet into the database. -np_values_ = [ - np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), - np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), - np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), - np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), - np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), - np.array(["test01", "test02", "test03", "test04"]), -] -np_timestamps_ = np.array([98, 99, 100, 101], TSDataType.INT64.np_dtype()) -np_bitmaps_ = [] -for i in range(len(measurements_)): - np_bitmaps_.append(BitMap(len(np_timestamps_))) -np_bitmaps_[0].mark(0) -np_bitmaps_[1].mark(1) -np_bitmaps_[2].mark(2) -np_bitmaps_[4].mark(3) -np_bitmaps_[5].mark(3) -np_tablet_with_none = NumpyTablet( - "root.sg_test_01.d_02", - measurements_, - data_types_, - np_values_, - np_timestamps_, - np_bitmaps_, -) -session.insert_tablet(np_tablet_with_none) - - -session.insert_tablet(np_tablet_unsorted) -print(np_tablet_unsorted.get_timestamps()) -for value in np_tablet_unsorted.get_values(): - print(value) - -# insert multiple tablets into database -tablet_01 = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11] -) -tablet_02 = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15] -) -session.insert_tablets([tablet_01, tablet_02]) - -# insert one tablet with empty cells into the database. -values_ = [ - [None, 10, 11, 1.1, 10011.1, "test01"], - [True, None, 11111, 1.25, 101.0, "test02"], - [False, 100, 1, None, 688.25, "test03"], - [True, 0, 0, 0, 6.25, None], -] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. -timestamps_ = [16, 17, 18, 19] -tablet_ = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ -) -session.insert_tablet(tablet_) - -# insert records of one device -time_list = [1, 2, 3] -measurements_list = [ - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], -] -data_types_list = [ - [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], - [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], - [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], -] -values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] - -session.insert_records_of_one_device( - "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list -) - -# execute non-query sql statement -session.execute_non_query_statement( - "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" -) - -# execute sql query statement -with session.execute_query_statement( - "select * from root.sg_test_01.d_01" -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) -# execute sql query statement -with session.execute_query_statement( - "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02" -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) - -# execute statement -with session.execute_statement( - "select * from root.sg_test_01.d_01" -) as session_data_set: - while session_data_set.has_next(): - print(session_data_set.next()) - -session.execute_statement( - "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" -) - -# insert string records of one device -time_list = [1, 2, 3] -measurements_list = [ - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], -] -values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]] - -session.insert_string_records_of_one_device( - "root.sg_test_01.d_03", - time_list, - measurements_list, - values_list, -) - -with session.execute_raw_data_query( - ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4 -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) - -with session.execute_last_data_query( - ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0 -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) - -# delete database -session.delete_storage_group("root.sg_test_01") - -# create template -template = Template(name="template_python", share_time=False) -m_node_1 = MeasurementNode( - name="s1", - data_type=TSDataType.INT64, - encoding=TSEncoding.RLE, - compression_type=Compressor.SNAPPY, -) -m_node_2 = MeasurementNode( - name="s2", - data_type=TSDataType.INT64, - encoding=TSEncoding.RLE, - compression_type=Compressor.SNAPPY, -) -m_node_3 = MeasurementNode( - name="s3", - data_type=TSDataType.INT64, - encoding=TSEncoding.RLE, - compression_type=Compressor.SNAPPY, -) -template.add_template(m_node_1) -template.add_template(m_node_2) -template.add_template(m_node_3) -session.create_schema_template(template) -print("create template success template_python") - -# close session connection. +session.open() +# startTime = int(datetime.now().timestamp() * 1000) +# with session.execute_query_statement("select ** from root") as data_set: +# data_set.todf() +# +# print("todf cost: " + str(int(datetime.now().timestamp() * 1000) - startTime) + "ms") +startTime = int(datetime.now().timestamp() * 1000) +with session.execute_query_statement("select ** from root") as data_set: + data_set.get_column_names() + data_set.get_column_types() + while data_set.has_next(): + data_set.next() +print("Query cost: " + str(int(datetime.now().timestamp() * 1000) - startTime) + "ms") session.close() - -print("All executions done!!") +exit(0) diff --git a/iotdb-client/client-py/iotdb/utils/Field.py b/iotdb-client/client-py/iotdb/utils/Field.py index 0756b1c49d6..281f005f554 100644 --- a/iotdb-client/client-py/iotdb/utils/Field.py +++ b/iotdb-client/client-py/iotdb/utils/Field.py @@ -21,17 +21,12 @@ from .IoTDBConstants import TSDataType class Field(object): - def __init__(self, data_type): + def __init__(self, data_type, value=None): """ :param data_type: TSDataType """ self.__data_type = data_type - self.__bool_value = None - self.__int_value = None - self.__long_value = None - self.__float_value = None - self.__double_value = None - self.__binary_value = None + self.__value = value @staticmethod def copy(field): @@ -62,70 +57,72 @@ class Field(object): return self.__data_type is None def set_bool_value(self, value): - self.__bool_value = value + self.__value = value def get_bool_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") - return self.__bool_value + if self.__data_type != TSDataType.BOOLEAN: + return None + return self.__value def set_int_value(self, value): - self.__int_value = value + self.__value = value def get_int_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") - return self.__int_value + if self.__data_type != TSDataType.INT32: + return None + return self.__value def set_long_value(self, value): - self.__long_value = value + self.__value = value def get_long_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") - return self.__long_value + if self.__data_type != TSDataType.INT64: + return None + return self.__value def set_float_value(self, value): - self.__float_value = value + self.__value = value def get_float_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") - return self.__float_value + if self.__data_type != TSDataType.FLOAT: + return None + return self.__value def set_double_value(self, value): - self.__double_value = value + self.__value = value def get_double_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") - return self.__double_value + if self.__data_type != TSDataType.DOUBLE: + return None + return self.__value def set_binary_value(self, value): - self.__binary_value = value + self.__value = value def get_binary_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") - return self.__binary_value + if self.__data_type != TSDataType.TEXT: + return None + return self.__value def get_string_value(self): if self.__data_type is None: return "None" - elif self.__data_type == TSDataType.BOOLEAN: - return str(self.__bool_value) - elif self.__data_type == TSDataType.INT64: - return str(self.__long_value) - elif self.__data_type == TSDataType.INT32: - return str(self.__int_value) - elif self.__data_type == TSDataType.FLOAT: - return str(self.__float_value) - elif self.__data_type == TSDataType.DOUBLE: - return str(self.__double_value) elif self.__data_type == TSDataType.TEXT: - return self.__binary_value.decode("utf-8") + return self.__value.decode("utf-8") else: - raise Exception("unsupported data type {}".format(self.__data_type)) + return str(self.__value) def __str__(self): return self.get_string_value() @@ -136,20 +133,10 @@ class Field(object): """ if self.__data_type is None: return None - elif data_type == TSDataType.BOOLEAN: - return self.get_bool_value() - elif data_type == TSDataType.INT32: - return self.get_int_value() - elif data_type == TSDataType.INT64: - return self.get_long_value() - elif data_type == TSDataType.FLOAT: - return self.get_float_value() - elif data_type == TSDataType.DOUBLE: - return self.get_double_value() - elif data_type == TSDataType.TEXT: - return self.get_binary_value() - else: - raise Exception("unsupported data type {}".format(data_type)) + return self.__value + + def set_value(self, value): + self.__value = value @staticmethod def get_field(value, data_type): @@ -159,19 +146,5 @@ class Field(object): """ if value is None: return None - field = Field(data_type) - if data_type == TSDataType.BOOLEAN: - field.set_bool_value(value) - elif data_type == TSDataType.INT32: - field.set_int_value(value) - elif data_type == TSDataType.INT64: - field.set_long_value(value) - elif data_type == TSDataType.FLOAT: - field.set_float_value(value) - elif data_type == TSDataType.DOUBLE: - field.set_double_value(value) - elif data_type == TSDataType.TEXT: - field.set_binary_value(value) - else: - raise Exception("unsupported data type {}".format(data_type)) + field = Field(data_type, value) return field diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py index 564a0377341..b060260c8f5 100644 --- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -27,13 +27,17 @@ from iotdb.thrift.rpc.IClientRPCService import TSFetchResultsReq, TSCloseOperati from iotdb.utils.IoTDBConstants import TSDataType logger = logging.getLogger("IoTDB") +shift_table = tuple([0x80 >> i for i in range(8)]) + + +def _to_bitbuffer(b): + return bytes("{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b)), "utf-8") class IoTDBRpcDataSet(object): TIMESTAMP_STR = "Time" # VALUE_IS_NULL = "The value got by %s (column name) is NULL." START_INDEX = 2 - FLAG = 0x80 def __init__( self, @@ -64,7 +68,8 @@ class IoTDBRpcDataSet(object): self.__column_ordinal_dict = {} if not ignore_timestamp: self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR) - self.__column_type_list.append(TSDataType.INT64) + # TSDataType.INT64 for time + self.__column_type_list.append(2) self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1 if column_name_index is not None: @@ -107,6 +112,9 @@ class IoTDBRpcDataSet(object): self.__empty_resultSet = False self.__has_cached_record = False self.__rows_index = 0 + self.__is_null_info = [ + False for _ in range(len(self.__column_type_deduplicated_list)) + ] def close(self): if self.__is_closed: @@ -156,11 +164,6 @@ class IoTDBRpcDataSet(object): return True return False - def _to_bitbuffer(self, b): - return bytes( - "{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b)), "utf-8" - ) - def resultset_to_pandas(self): result = {} for column_name in self.__column_name_list: @@ -197,25 +200,25 @@ class IoTDBRpcDataSet(object): value_buffer_len = len(value_buffer) data_array = None - if data_type == TSDataType.DOUBLE: + if data_type == 4: data_array = np.frombuffer( value_buffer, np.dtype(np.double).newbyteorder(">") ) - elif data_type == TSDataType.FLOAT: + elif data_type == 3: data_array = np.frombuffer( value_buffer, np.dtype(np.float32).newbyteorder(">") ) - elif data_type == TSDataType.BOOLEAN: + elif data_type == 0: data_array = np.frombuffer(value_buffer, np.dtype("?")) - elif data_type == TSDataType.INT32: + elif data_type == 1: data_array = np.frombuffer( value_buffer, np.dtype(np.int32).newbyteorder(">") ) - elif data_type == TSDataType.INT64: + elif data_type == 2: data_array = np.frombuffer( value_buffer, np.dtype(np.int64).newbyteorder(">") ) - elif data_type == TSDataType.TEXT: + elif data_type == 5: j = 0 offset = 0 data_array = [] @@ -239,29 +242,27 @@ class IoTDBRpcDataSet(object): self.__query_data_set.valueList[location] = None if len(data_array) < total_length: - if data_type == TSDataType.INT32 or data_type == TSDataType.INT64: + if data_type == 1 or data_type == 2: tmp_array = np.full(total_length, np.nan, np.float32) - elif ( - data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE - ): + elif data_type == 3 or data_type == 4: tmp_array = np.full(total_length, np.nan, data_array.dtype) - elif data_type == TSDataType.BOOLEAN: + elif data_type == 0: tmp_array = np.full(total_length, np.nan, np.float32) - elif data_type == TSDataType.TEXT: + elif data_type == 5: tmp_array = np.full(total_length, None, dtype=data_array.dtype) bitmap_buffer = self.__query_data_set.bitmapList[location] - buffer = self._to_bitbuffer(bitmap_buffer) + buffer = _to_bitbuffer(bitmap_buffer) bit_mask = (np.frombuffer(buffer, "u1") - ord("0")).astype(bool) if len(bit_mask) != total_length: bit_mask = bit_mask[:total_length] tmp_array[bit_mask] = data_array - if data_type == TSDataType.INT32: + if data_type == 1: tmp_array = pd.Series(tmp_array).astype("Int32") - elif data_type == TSDataType.INT64: + elif data_type == 2: tmp_array = pd.Series(tmp_array).astype("Int64") - elif data_type == TSDataType.BOOLEAN: + elif data_type == 0: tmp_array = pd.Series(tmp_array).astype("boolean") data_array = tmp_array @@ -287,34 +288,27 @@ class IoTDBRpcDataSet(object): # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read. self.__time_bytes = self.__query_data_set.time[:8] self.__query_data_set.time = self.__query_data_set.time[8:] - for i in range(len(self.__query_data_set.bitmapList)): - bitmap_buffer = self.__query_data_set.bitmapList[i] - + for i, bitmap_buffer in enumerate(self.__query_data_set.bitmapList): # another 8 new rows, should move the bitmap buffer position to next byte if self.__rows_index % 8 == 0: self.__current_bitmap[i] = bitmap_buffer[0] self.__query_data_set.bitmapList[i] = bitmap_buffer[1:] - if not self.is_null(i, self.__rows_index): + if not self.is_null(self.__current_bitmap[i], self.__rows_index): + self.__is_null_info[i] = False value_buffer = self.__query_data_set.valueList[i] data_type = self.__column_type_deduplicated_list[i] # simulating buffer - if data_type == TSDataType.BOOLEAN: + if data_type == 0: self.__value[i] = value_buffer[:1] self.__query_data_set.valueList[i] = value_buffer[1:] - elif data_type == TSDataType.INT32: - self.__value[i] = value_buffer[:4] - self.__query_data_set.valueList[i] = value_buffer[4:] - elif data_type == TSDataType.INT64: - self.__value[i] = value_buffer[:8] - self.__query_data_set.valueList[i] = value_buffer[8:] - elif data_type == TSDataType.FLOAT: + elif data_type == 1 or data_type == 3: self.__value[i] = value_buffer[:4] self.__query_data_set.valueList[i] = value_buffer[4:] - elif data_type == TSDataType.DOUBLE: + elif data_type == 2 or data_type == 4: self.__value[i] = value_buffer[:8] self.__query_data_set.valueList[i] = value_buffer[8:] - elif data_type == TSDataType.TEXT: + elif data_type == 5: length = int.from_bytes( value_buffer[:4], byteorder="big", signed=False ) @@ -322,6 +316,8 @@ class IoTDBRpcDataSet(object): self.__query_data_set.valueList[i] = value_buffer[4 + length :] else: raise RuntimeError("unsupported data type {}.".format(data_type)) + else: + self.__is_null_info[i] = True self.__rows_index += 1 self.__has_cached_record = True @@ -347,10 +343,10 @@ class IoTDBRpcDataSet(object): "Cannot fetch result from server, because of network connection: ", e ) - def is_null(self, index, row_num): - bitmap = self.__current_bitmap[index] - shift = row_num % 8 - return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xFF)) == 0 + @staticmethod + def is_null(bitmap, row_num): + shift = shift_table[row_num % 8] + return shift & bitmap == 0 def is_null_by_index(self, column_index): index = ( @@ -360,14 +356,20 @@ class IoTDBRpcDataSet(object): # time column will never be None if index < 0: return True - return self.is_null(index, self.__rows_index - 1) + return self.is_null(self.__current_bitmap[index], self.__rows_index - 1) def is_null_by_name(self, column_name): index = self.__column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX # time column will never be None if index < 0: return True - return self.is_null(index, self.__rows_index - 1) + return self.is_null(self.__current_bitmap[index], self.__rows_index - 1) + + def is_null_by_location(self, location): + # time column will never be None + if location < 0: + return True + return self.__is_null_info[location] def find_column_name_by_index(self, column_index): if column_index <= 0: diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 02eef027dfd..162755a3767 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -57,6 +57,14 @@ class SessionDataSet(object): query_data_set, 1024, ) + self.column_size = self.iotdb_rpc_data_set.get_column_size() + self.is_ignore_timestamp = self.iotdb_rpc_data_set.get_ignore_timestamp() + self.column_names = tuple(self.iotdb_rpc_data_set.get_column_names()) + self.column_ordinal_dict = self.iotdb_rpc_data_set.get_column_ordinal_dict() + self.column_type_deduplicated_list = tuple( + self.iotdb_rpc_data_set.get_column_type_deduplicated_list() + ) + self.__field_list = [Field(data_type) for data_type in self.column_type_deduplicated_list] def __enter__(self): return self @@ -87,50 +95,42 @@ class SessionDataSet(object): return self.construct_row_record_from_value_array() def construct_row_record_from_value_array(self): - out_fields = [] - for i in range(self.iotdb_rpc_data_set.get_column_size()): - index = i + 1 - data_set_column_index = i + IoTDBRpcDataSet.START_INDEX - if self.iotdb_rpc_data_set.get_ignore_timestamp(): - index -= 1 - data_set_column_index -= 1 - column_name = self.iotdb_rpc_data_set.get_column_names()[index] - location = ( - self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - - IoTDBRpcDataSet.START_INDEX - ) - - if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index): + if self.is_ignore_timestamp: + start = 0 + end = self.column_size + else: + start = 1 + end = self.column_size + 1 + for index in range(start, end): + column_name = self.column_names[index] + location = self.column_ordinal_dict[column_name] - 2 + if not self.iotdb_rpc_data_set.is_null_by_location(location): value_bytes = self.iotdb_rpc_data_set.get_values()[location] - data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[ - location - ] - field = Field(data_type) - if data_type == TSDataType.BOOLEAN: + data_type = self.column_type_deduplicated_list[location] + if data_type == 0: value = struct.unpack(">?", value_bytes)[0] - field.set_bool_value(value) - elif data_type == TSDataType.INT32: + self.__field_list[index - 1].set_value(value) + elif data_type == 1: value = struct.unpack(">i", value_bytes)[0] - field.set_int_value(value) - elif data_type == TSDataType.INT64: + self.__field_list[index - 1].set_value(value) + elif data_type == 2: value = struct.unpack(">q", value_bytes)[0] - field.set_long_value(value) - elif data_type == TSDataType.FLOAT: + self.__field_list[index - 1].set_value(value) + elif data_type == 3: value = struct.unpack(">f", value_bytes)[0] - field.set_float_value(value) - elif data_type == TSDataType.DOUBLE: + self.__field_list[index - 1].set_value(value) + elif data_type == 4: value = struct.unpack(">d", value_bytes)[0] - field.set_double_value(value) - elif data_type == TSDataType.TEXT: - field.set_binary_value(value_bytes) + self.__field_list[index - 1].set_value(value) + elif data_type == 5: + self.__field_list[index - 1].set_value(value_bytes) else: raise RuntimeError("unsupported data type {}.".format(data_type)) else: - field = Field(None) - out_fields.append(field) + self.__field_list[index - 1].set_value(None) return RowRecord( - struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields + struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], self.__field_list ) def close_operation_handle(self):
