This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 56097ac5b2f Fix data export logic to avoid redundant high frequency
query (#17049)
56097ac5b2f is described below
commit 56097ac5b2fc7dba6fe74e8a9e95cf59e9a64cdd
Author: LimJiaWenBrenda <[email protected]>
AuthorDate: Wed Jan 21 16:13:58 2026 +0800
Fix data export logic to avoid redundant high frequency query (#17049)
* Fix data export logic to avoid redundant high frequency query
* Add quotation to values with data type STRING
* Reset current lines and close writer properly
---
.../org/apache/iotdb/tool/data/ExportDataTree.java | 221 ++++++++++++---------
1 file changed, 131 insertions(+), 90 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
index beee7f49c92..9d7d0ca521d 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
@@ -136,116 +136,157 @@ public class ExportDataTree extends AbstractExportData {
}
}
- private void exportToSqlFile(SessionDataSet sessionDataSet, String filePath)
+ private void exportToSqlFileWithAlignDevice(SessionDataSet sessionDataSet,
String filePath)
+ throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ processedRows = 0;
+ lastPrintTime = 0;
+
+ List<String> measurementNames = sessionDataSet.getColumnNames();
+ if (CollectionUtils.isEmpty(measurementNames) || measurementNames.size()
<= 1) {
+ return;
+ } else {
+ measurementNames.remove("Time");
+ measurementNames.remove("Device");
+ }
+ String sqlPrefix =
+ String.format(
+ "INSERT INTO %s(TIMESTAMP,%s) ALIGNED VALUES (%s);\n",
+ "%s", String.join(",", measurementNames), "%d,%s");
+
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = columnTypeList.size();
+ String deviceName = null;
+ int fileIndex = 0;
+ int currentLines = 0;
+ String filePathTemplate = filePath + "_%d" + ".sql";
+ FileWriter writer = null;
+ while (iterator.next()) {
+ if (writer == null) {
+ writer = new FileWriter(String.format(filePathTemplate, fileIndex));
+ }
+ deviceName = iterator.getString(2);
+ if (deviceName.startsWith(SYSTEM_DATABASE + ".")
+ || deviceName.startsWith(AUDIT_DATABASE + ".")) {
+ continue;
+ }
+ List<String> values = new ArrayList<>();
+ for (int index = 2; index < totalColumns; index++) {
+ if ("TEXT".equalsIgnoreCase(columnTypeList.get(index))
+ || "STRING".equalsIgnoreCase(columnTypeList.get(index))) {
+ values.add(String.format("\"%s\"", iterator.getString(index + 1)));
+ } else {
+ values.add(iterator.getString(index + 1));
+ }
+ }
+ long timestamp = iterator.getLong(1);
+ writer.write(String.format(sqlPrefix, deviceName, timestamp,
String.join(",", values)));
+ processedRows += 1;
+ currentLines += 1;
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
+ if (currentLines >= linesPerFile) {
+ writer.flush();
+ writer.close();
+ fileIndex += 1;
+ writer = null;
+ currentLines = 0;
+ }
+ }
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+ ioTPrinter.print("\n");
+ }
+
+ private void exportToSqlFileWithoutAlign(SessionDataSet sessionDataSet,
String filePath)
throws IoTDBConnectionException, StatementExecutionException,
IOException {
processedRows = 0;
lastPrintTime = 0;
+
List<String> headers = sessionDataSet.getColumnNames();
- int fileIndex = 0;
+ List<String> measurementNames = new ArrayList<>();
String deviceName = null;
- boolean writeNull = false;
- List<String> seriesList = new ArrayList<>(headers);
if (CollectionUtils.isEmpty(headers) || headers.size() <= 1) {
- writeNull = true;
+ return;
} else {
- if (headers.contains("Device")) {
- seriesList.remove("Time");
- seriesList.remove("Device");
- } else {
- Path path = new Path(seriesList.get(1), true);
- deviceName = path.getDeviceString();
- seriesList.remove("Time");
- for (int i = 0; i < seriesList.size(); i++) {
- String series = seriesList.get(i);
- path = new Path(series, true);
- seriesList.set(i, path.getMeasurement());
+ headers.remove("Time");
+ Path path = new Path(headers.get(0), true);
+ deviceName = path.getDeviceString();
+ for (String header : headers) {
+ path = new Path(header, true);
+ String meas = path.getMeasurement();
+ if (path.getDeviceString().equals(deviceName)) {
+ measurementNames.add(meas);
}
}
}
+ if (deviceName.startsWith(SYSTEM_DATABASE + ".")
+ || deviceName.startsWith(AUDIT_DATABASE + ".")) {
+ return;
+ }
+ String sqlPrefix =
+ String.format(
+ "INSERT INTO %s(TIMESTAMP,%s) VALUES (%s);\n",
+ "%s", String.join(",", measurementNames), "%d,%s");
+
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
List<String> columnTypeList = iterator.getColumnTypeList();
- int totalColumns = columnTypeList.size();
- boolean fromOuterloop = false;
+ int totalColumns = measurementNames.size();
+ int fileIndex = 0;
+ int currentLines = 0;
+ String filePathTemplate = filePath + "_%d" + ".sql";
+ FileWriter writer = null;
while (iterator.next()) {
- fromOuterloop = true;
- final String finalFilePath = filePath + "_" + fileIndex + ".sql";
- try (FileWriter writer = new FileWriter(finalFilePath)) {
- if (writeNull) {
- break;
- }
- int i = 0;
- while (i++ < linesPerFile && (fromOuterloop || iterator.next())) {
- fromOuterloop = false;
- List<String> headersTemp = new ArrayList<>(seriesList);
- List<String> timeseries = new ArrayList<>();
- if (headers.contains("Device")) {
- deviceName = iterator.getString(2);
- if (deviceName.startsWith(SYSTEM_DATABASE + ".")
- || deviceName.startsWith(AUDIT_DATABASE + ".")) {
- continue;
- }
- for (String header : headersTemp) {
- timeseries.add(deviceName + "." + header);
- }
- } else {
- if (headers.get(1).startsWith(SYSTEM_DATABASE + ".")
- || headers.get(1).startsWith(AUDIT_DATABASE + ".")) {
- continue;
- }
- timeseries.addAll(headers);
- timeseries.remove(0);
- }
- long timestamp = iterator.getLong(1);
- String sqlMiddle =
- Boolean.TRUE.equals(aligned)
- ? " ALIGNED VALUES (" + timestamp + ","
- : " VALUES (" + timestamp + ",";
- List<String> values = new ArrayList<>();
- int startIndex = headers.contains("Device") ? 2 : 1;
- for (int index = startIndex; index < totalColumns; index++) {
- SessionDataSet sessionDataSet2 =
- session.executeQueryStatement(
- "SHOW TIMESERIES " + timeseries.get(index - startIndex),
timeout);
- SessionDataSet.DataIterator iterator2 = sessionDataSet2.iterator();
- if (iterator2.next()) {
- if (iterator.isNull(index + 1)) {
- headersTemp.remove(seriesList.get(index - startIndex));
- continue;
- }
- String value = iterator.getString(index + 1);
- if ("TEXT".equalsIgnoreCase(iterator2.getString(4))) {
- values.add("\"" + value + "\"");
- } else {
- values.add(value);
- }
- } else {
- headersTemp.remove(seriesList.get(index - startIndex));
- }
- }
- if (CollectionUtils.isNotEmpty(headersTemp)) {
- writer.write(
- "INSERT INTO "
- + deviceName
- + "(TIMESTAMP,"
- + String.join(",", headersTemp)
- + ")"
- + sqlMiddle
- + String.join(",", values)
- + ");\n");
- processedRows += 1;
- if (System.currentTimeMillis() - lastPrintTime >
updateTimeInterval) {
- ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
- lastPrintTime = System.currentTimeMillis();
- }
- }
+ if (writer == null) {
+ writer = new FileWriter(String.format(filePathTemplate, fileIndex));
+ }
+ List<String> values = new ArrayList<>();
+ for (int index = 0; index < totalColumns; index++) {
+ if ("TEXT".equalsIgnoreCase(columnTypeList.get(index + 1))
+ || "STRING".equalsIgnoreCase(columnTypeList.get(index + 1))) {
+ values.add(String.format("\"%s\"", iterator.getString(index + 2)));
+ } else {
+ values.add(iterator.getString(index + 2));
}
+ }
+ long timestamp = iterator.getLong(1);
+ writer.write(String.format(sqlPrefix, deviceName, timestamp,
String.join(",", values)));
+ processedRows += 1;
+ currentLines += 1;
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
+ if (currentLines >= linesPerFile) {
writer.flush();
+ writer.close();
+ fileIndex += 1;
+ writer = null;
+ currentLines = 0;
}
- fileIndex++;
+ }
+ if (writer != null) {
+ writer.flush();
+ writer.close();
}
ioTPrinter.print("\n");
}
+ private void exportToSqlFile(SessionDataSet sessionDataSet, String filePath)
+ throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ List<String> headers = sessionDataSet.getColumnNames();
+ boolean alignByDeviceQuery = headers.contains("Device");
+ if (alignByDeviceQuery) {
+ exportToSqlFileWithAlignDevice(sessionDataSet, filePath);
+ } else {
+ exportToSqlFileWithoutAlign(sessionDataSet, filePath);
+ }
+ }
+
private static Boolean exportToTsFile(SessionDataSet sessionDataSet, String
filePath)
throws IOException,
IoTDBConnectionException,