This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cd1507743fb Modified parameter timeout and mfs in data export (#16252)
cd1507743fb is described below
commit cd1507743fbd6000c393a4c982fd7574825f51b6
Author: LimJiaWenBrenda <[email protected]>
AuthorDate: Wed Aug 27 21:20:58 2025 +0800
Modified parameter timeout and mfs in data export (#16252)
* Modified implementation of exporting schema to sql file using show create
table
* Added double quotation marks to tableName dropped
* Used constant sql + escapeSqlIdentifier in sql sentence
* Changed default timeout + Allowed mfs configuration
* Tracked progress of table data export
* Tracked progress of both tree and table data export
* Modified rowRecord to iterator for csv and sql export
* Adjust updateTimeInerval to 2s
* Used constant process sentence + fixed copilot review
* Fix value null bug
---
.../org/apache/iotdb/tool/common/Constants.java | 6 +
.../org/apache/iotdb/tool/common/OptionsUtil.java | 51 ++-----
.../apache/iotdb/tool/data/AbstractDataTool.java | 4 +-
.../org/apache/iotdb/tool/data/ExportData.java | 4 +
.../apache/iotdb/tool/data/ExportDataTable.java | 124 +++++++++------
.../org/apache/iotdb/tool/data/ExportDataTree.java | 167 ++++++++++++---------
6 files changed, 207 insertions(+), 149 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
index fe354559a3a..08b6264f915 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
@@ -204,6 +204,11 @@ public class Constants {
public static final String TARGET_FILE_NAME = "prefix_file_name";
public static final String TARGET_FILE_DESC = "Export file name .(optional)";
+ public static final String RPC_MAX_FRAME_SIZE_ARGS = "mfs";
+ public static final String RPC_MAX_FRAME_SIZE_NAME = "rpc_max_frame_size";
+ public static final String RPC_MAX_FRAME_SIZE_DESC =
+ "The max frame size of RPC, default is 536870912 bytes.(optional)";
+
public static final String DATA_TYPE_ARGS = "dt";
public static final String DATA_TYPE_NAME = "datatype";
public static final String DATA_TYPE_DESC =
@@ -306,6 +311,7 @@ public class Constants {
public static final String EXPORT_SCHEMA_COLUMNS_DESC = "desc %s.%s details";
public static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE %s.%s";
public static final String DROP_TABLE_IF_EXIST = "DROP TABLE IF EXISTS %s";
+ public static final String PROCESSED_PROGRESS = "\rProcessed %d rows";
// import constants
public static final String IMPORT_SCHEMA_CLI_PREFIX = "ImportSchema";
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
index 57ccff5c3e5..bc95e1e0d8a 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
@@ -142,7 +142,7 @@ public class OptionsUtil extends Constants {
return options;
}
- public static Options createTreeExportCommonOptions() {
+ public static Options createExportCommonOptions() {
Options options = createImportCommonOptions();
Option opFile =
@@ -181,48 +181,25 @@ public class OptionsUtil extends Constants {
.desc(TIMEOUT_DESC)
.build();
options.addOption(opTimeOut);
- return options;
- }
- public static Options createTableExportCommonOptions() {
- final Options options = createImportCommonOptions();
-
- Option opFile =
- Option.builder(TARGET_DIR_ARGS)
- .required()
- .longOpt(TARGET_DIR_NAME)
- .argName(TARGET_DIR_ARGS_NAME)
+ Option opRpcMaxFrameSize =
+ Option.builder(RPC_MAX_FRAME_SIZE_ARGS)
+ .longOpt(RPC_MAX_FRAME_SIZE_NAME)
+ .argName(RPC_MAX_FRAME_SIZE_NAME)
.hasArg()
- .desc(TARGET_DIR_DESC)
+ .desc(RPC_MAX_FRAME_SIZE_DESC)
.build();
- options.addOption(opFile);
+ options.addOption(opRpcMaxFrameSize);
- Option opOnSuccess =
- Option.builder(TARGET_FILE_ARGS)
- .longOpt(TARGET_FILE_NAME)
- .argName(TARGET_FILE_NAME)
- .hasArg()
- .desc(TARGET_FILE_DESC)
- .build();
- options.addOption(opOnSuccess);
+ return options;
+ }
- Option opQuery =
- Option.builder(QUERY_COMMAND_ARGS)
- .longOpt(QUERY_COMMAND_NAME)
- .argName(QUERY_COMMAND_ARGS_NAME)
- .hasArg()
- .desc(QUERY_COMMAND_DESC)
- .build();
- options.addOption(opQuery);
+ public static Options createTreeExportCommonOptions() {
+ return createExportCommonOptions();
+ }
- Option opTimeOut =
- Option.builder(TIMEOUT_ARGS)
- .longOpt(TIMEOUT_NAME)
- .argName(TIMEOUT_NAME)
- .hasArg()
- .desc(TIMEOUT_DESC)
- .build();
- options.addOption(opTimeOut);
+ public static Options createTableExportCommonOptions() {
+ Options options = createExportCommonOptions();
Option opDatabase =
Option.builder(DB_ARGS)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
index 54d35c4f370..ebaa87bc263 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
@@ -90,8 +90,9 @@ public abstract class AbstractDataTool {
protected static String database;
protected static String startTime;
protected static int threadNum = 8;
+ protected static int rpcMaxFrameSize = 536870912;
protected static String targetPath;
- protected static long timeout = -1;
+ protected static long timeout = Long.MAX_VALUE;
protected static String timeZoneID;
protected static String timeFormat;
protected static String exportType;
@@ -114,6 +115,7 @@ public abstract class AbstractDataTool {
protected static ZoneId zoneId = ZoneId.systemDefault();
protected static ImportTsFileOperation successOperation;
protected static String targetFile = Constants.DUMP_FILE_NAME_DEFAULT;
+ protected static final int updateTimeInterval = 2000;
protected static final LongAdder loadFileFailedNum = new LongAdder();
protected static final LongAdder loadFileSuccessfulNum = new LongAdder();
protected static final LongAdder processingLoadFailedFileSuccessfulNum = new
LongAdder();
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
index b31bc8d90b9..01a50d1c7d8 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
@@ -270,6 +270,10 @@ public class ExportData extends AbstractDataTool {
if (timeoutString != null) {
timeout = Long.parseLong(timeoutString);
}
+ String rpcMaxFrameSizeString =
commandLine.getOptionValue(Constants.RPC_MAX_FRAME_SIZE_ARGS);
+ if (rpcMaxFrameSizeString != null) {
+ rpcMaxFrameSize = Integer.parseInt(rpcMaxFrameSizeString);
+ }
if (needDataTypePrinted == null) {
needDataTypePrinted = true;
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
index 4bb67cb3ea5..45c02fb60eb 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
@@ -58,6 +58,8 @@ public class ExportDataTable extends AbstractExportData {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
private static ITableSession tableSession;
private static List<String> tables = new ArrayList<>();
+ private static long processedRows;
+ private static long lastPrintTime;
@Override
public void init() throws IoTDBConnectionException,
StatementExecutionException {
@@ -67,6 +69,7 @@ public class ExportDataTable extends AbstractExportData {
.username(username)
.password(password)
.database(database)
+ .thriftMaxFrameSize(rpcMaxFrameSize)
.build();
SessionDataSet sessionDataSet = tableSession.executeQueryStatement("show
databases", timeout);
List<String> databases = new ArrayList<>();
@@ -153,40 +156,50 @@ public class ExportDataTable extends AbstractExportData {
private void exportToSqlFile(SessionDataSet sessionDataSet, String table,
String filePath)
throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ processedRows = 0;
+ lastPrintTime = 0;
StringBuilder sqlBuilder;
List<String> headers = sessionDataSet.getColumnNames();
String prevSql = "insert into " + table + "(" + StringUtils.join(headers,
",") + ") values(";
- final List<String> columnTypes = sessionDataSet.getColumnTypes();
- boolean hasNext = sessionDataSet.hasNext();
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = columnTypeList.size();
int fileIndex = 0;
- while (hasNext) {
+ boolean fromOuterLoop = false;
+ while (iterator.next()) {
final String finalFilePath = filePath + "_" + fileIndex + ".sql";
int countLine = 0;
+ fromOuterLoop = true;
try (FileWriter writer = new FileWriter(finalFilePath)) {
- while (countLine++ < linesPerFile && hasNext) {
- RowRecord rowRecord = sessionDataSet.next();
+ while (countLine++ < linesPerFile && (fromOuterLoop ||
iterator.next())) {
+ fromOuterLoop = false;
sqlBuilder = new StringBuilder();
- List<Field> fields = rowRecord.getFields();
sqlBuilder.append(prevSql);
- for (int i = 0; i < fields.size(); i++) {
- if (i > 0) {
+ for (int curColumnIndex = 0; curColumnIndex < totalColumns;
curColumnIndex++) {
+ if (curColumnIndex > 0) {
sqlBuilder.append(",");
}
- final TSDataType type = getType(columnTypes.get(i));
- if (TSDataType.TEXT.equals(type) ||
TSDataType.STRING.equals(type)) {
-
sqlBuilder.append("\'").append(fields.get(i).getObjectValue(type)).append("\'");
+ String columnType = columnTypeList.get(curColumnIndex);
+ String columnValue = iterator.getString(curColumnIndex + 1);
+ if (columnType.equals("TEXT") || columnType.equals("STRING")) {
+ sqlBuilder.append("'").append(columnValue).append("'");
} else {
- sqlBuilder.append(fields.get(i).getObjectValue(type));
+ sqlBuilder.append(columnValue);
}
}
sqlBuilder.append(");\n");
writer.write(sqlBuilder.toString());
- hasNext = sessionDataSet.hasNext();
+ processedRows += 1;
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval)
{
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
writer.flush();
fileIndex++;
}
}
+ ioTPrinter.print("\n");
}
private Boolean exportToTsFile(SessionDataSet sessionDataSet, String
filePath, String table)
@@ -194,6 +207,8 @@ public class ExportDataTable extends AbstractExportData {
IoTDBConnectionException,
StatementExecutionException,
WriteProcessException {
+ processedRows = 0;
+ lastPrintTime = 0;
List<String> columnNamesRaw = sessionDataSet.getColumnNames();
List<TSDataType> columnTypesRaw =
sessionDataSet.getColumnTypes().stream().map(t ->
getType(t)).collect(Collectors.toList());
@@ -233,44 +248,54 @@ public class ExportDataTable extends AbstractExportData {
private void exportToCsvFile(SessionDataSet sessionDataSet, String filePath)
throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ processedRows = 0;
+ lastPrintTime = 0;
List<String> headers = sessionDataSet.getColumnNames();
int fileIndex = 0;
- boolean hasNext = sessionDataSet.hasNext();
- while (hasNext) {
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = columnTypeList.size();
+ boolean fromOuterloop = false;
+ while (iterator.next()) {
final String finalFilePath = filePath + "_" + fileIndex + ".csv";
final CSVPrinterWrapper csvPrinterWrapper = new
CSVPrinterWrapper(finalFilePath);
- csvPrinterWrapper.printRecord(headers);
- int i = 0;
- while (i++ < linesPerFile) {
- RowRecord rowRecord = sessionDataSet.next();
- rowRecord
- .getFields()
- .forEach(
- field -> {
- String fieldStringValue = field.getStringValue();
- if (!"null".equals(field.getStringValue())) {
- if ((field.getDataType() == TSDataType.TEXT
- || field.getDataType() == TSDataType.STRING)) {
- fieldStringValue = "\"" + fieldStringValue + "\"";
- } else if (field.getDataType() == TSDataType.TIMESTAMP) {
- fieldStringValue = timeTrans(field.getLongV());
- }
- csvPrinterWrapper.print(fieldStringValue);
- } else {
- csvPrinterWrapper.print("");
- }
- });
- csvPrinterWrapper.println();
- // 检查下一行是否存在
- hasNext = sessionDataSet.hasNext();
- if (!hasNext) {
- break;
+ try {
+ csvPrinterWrapper.printRecord(headers);
+ fromOuterloop = true;
+ int countLine = 0;
+ while (countLine++ < linesPerFile && (fromOuterloop ||
iterator.next())) {
+ fromOuterloop = false;
+ for (int curColumnIndex = 0; curColumnIndex < totalColumns;
curColumnIndex++) {
+ String curType = columnTypeList.get(curColumnIndex);
+ if (curType.equalsIgnoreCase("TIMESTAMP")) {
+
csvPrinterWrapper.print(timeTrans(iterator.getLong(curColumnIndex + 1)));
+ } else {
+ String columnValue = iterator.getString(curColumnIndex + 1);
+ if (StringUtils.isEmpty(columnValue)) {
+ csvPrinterWrapper.print("");
+ } else {
+ if (curType.equalsIgnoreCase("TEXT") ||
curType.equalsIgnoreCase("STRING")) {
+ csvPrinterWrapper.print("\"" + columnValue + "\"");
+ } else {
+ csvPrinterWrapper.print(columnValue);
+ }
+ }
+ }
+ }
+ csvPrinterWrapper.println();
+ processedRows += 1;
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval)
{
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
+ fileIndex++;
+ csvPrinterWrapper.flush();
+ } finally {
+ csvPrinterWrapper.close();
}
- fileIndex++;
- csvPrinterWrapper.flush();
- csvPrinterWrapper.close();
}
+ ioTPrinter.print("\n");
}
private static void writeWithTablets(
@@ -305,13 +330,24 @@ public class ExportDataTable extends AbstractExportData {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
writeToTsFile(tsFileWriter, tablet);
+ processedRows += tablet.getRowSize();
tablet.initBitMaps();
tablet.reset();
}
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
if (tablet.getRowSize() != 0) {
writeToTsFile(tsFileWriter, tablet);
+ processedRows += tablet.getRowSize();
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
+ ioTPrinter.print("\n");
}
private static void writeToTsFile(ITsFileWriter tsFileWriter, Tablet tablet)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
index c9ff2d28dd1..0f330bc0831 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.tool.data;
import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.session.Session;
import org.apache.iotdb.tool.common.Constants;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -61,10 +62,23 @@ public class ExportDataTree extends AbstractExportData {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
private static Session session;
+ private static long processedRows;
+ private static long lastPrintTime;
@Override
public void init() throws IoTDBConnectionException,
StatementExecutionException, TException {
- session = new Session(host, Integer.parseInt(port), username, password);
+ session =
+ new Session(
+ host,
+ Integer.parseInt(port),
+ username,
+ password,
+ SessionConfig.DEFAULT_FETCH_SIZE,
+ null,
+ SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
+ rpcMaxFrameSize,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
+ SessionConfig.DEFAULT_VERSION);
session.open(false);
timestampPrecision = session.getTimestampPrecision();
if (timeZoneID != null) {
@@ -119,6 +133,8 @@ public class ExportDataTree extends AbstractExportData {
private void exportToSqlFile(SessionDataSet sessionDataSet, String filePath)
throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ processedRows = 0;
+ lastPrintTime = 0;
List<String> headers = sessionDataSet.getColumnNames();
int fileIndex = 0;
String deviceName = null;
@@ -141,21 +157,24 @@ public class ExportDataTree extends AbstractExportData {
}
}
}
- boolean hasNext = sessionDataSet.hasNext();
- while (hasNext) {
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = columnTypeList.size();
+ boolean fromOuterloop = false;
+ while (iterator.next()) {
+ fromOuterloop = true;
final String finalFilePath = filePath + "_" + fileIndex + ".sql";
try (FileWriter writer = new FileWriter(finalFilePath)) {
if (writeNull) {
break;
}
int i = 0;
- while (i++ < linesPerFile && hasNext) {
- RowRecord rowRecord = sessionDataSet.next();
- List<Field> fields = rowRecord.getFields();
+ while (i++ < linesPerFile && (fromOuterloop || iterator.next())) {
+ fromOuterloop = false;
List<String> headersTemp = new ArrayList<>(seriesList);
List<String> timeseries = new ArrayList<>();
if (headers.contains("Device")) {
- deviceName = fields.get(0).toString();
+ deviceName = iterator.getString(2);
if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
continue;
}
@@ -169,33 +188,31 @@ public class ExportDataTree extends AbstractExportData {
timeseries.addAll(headers);
timeseries.remove(0);
}
+ long timestamp = iterator.getLong(1);
String sqlMiddle =
Boolean.TRUE.equals(aligned)
- ? " ALIGNED VALUES (" + rowRecord.getTimestamp() + ","
- : " VALUES (" + rowRecord.getTimestamp() + ",";
+ ? " ALIGNED VALUES (" + timestamp + ","
+ : " VALUES (" + timestamp + ",";
List<String> values = new ArrayList<>();
- if (headers.contains("Device")) {
- fields.remove(0);
- }
- for (int index = 0; index < fields.size(); index++) {
- RowRecord next =
- session
- .executeQueryStatement("SHOW TIMESERIES " +
timeseries.get(index), timeout)
- .next();
- if (ObjectUtils.isNotEmpty(next)) {
- List<Field> timeseriesList = next.getFields();
- String value = fields.get(index).toString();
- if (value.equals("null")) {
- headersTemp.remove(seriesList.get(index));
+ int startIndex = headers.contains("Device") ? 2 : 1;
+ for (int index = startIndex; index < totalColumns; index++) {
+ SessionDataSet sessionDataSet2 =
+ session.executeQueryStatement(
+ "SHOW TIMESERIES " + timeseries.get(index - startIndex),
timeout);
+ SessionDataSet.DataIterator iterator2 = sessionDataSet2.iterator();
+ if (iterator2.next()) {
+ String value = iterator.getString(index + 1);
+ if (StringUtils.isEmpty(value)) {
+ headersTemp.remove(seriesList.get(index - startIndex));
continue;
}
- if
("TEXT".equalsIgnoreCase(timeseriesList.get(3).getStringValue())) {
+ if ("TEXT".equalsIgnoreCase(iterator2.getString(4))) {
values.add("\"" + value + "\"");
} else {
values.add(value);
}
} else {
- headersTemp.remove(seriesList.get(index));
+ headersTemp.remove(seriesList.get(index - startIndex));
}
}
if (CollectionUtils.isNotEmpty(headersTemp)) {
@@ -208,20 +225,18 @@ public class ExportDataTree extends AbstractExportData {
+ sqlMiddle
+ String.join(",", values)
+ ");\n");
- }
- hasNext = sessionDataSet.hasNext();
- if (!hasNext) {
- break;
+ processedRows += 1;
+ if (System.currentTimeMillis() - lastPrintTime >
updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
}
writer.flush();
}
- // 如果没有更多数据,退出循环
- if (!hasNext) {
- break;
- }
fileIndex++;
}
+ ioTPrinter.print("\n");
}
private static Boolean exportToTsFile(SessionDataSet sessionDataSet, String
filePath)
@@ -264,47 +279,52 @@ public class ExportDataTree extends AbstractExportData {
private void exportToCsvFile(SessionDataSet sessionDataSet, String filePath)
throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ processedRows = 0;
+ lastPrintTime = 0;
List<String> headers = sessionDataSet.getColumnNames();
int fileIndex = 0;
- boolean hasNext = sessionDataSet.hasNext();
- while (hasNext) {
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = columnTypeList.size();
+ boolean fromOuterloop = false;
+ while (iterator.next()) {
final String finalFilePath = filePath + "_" + fileIndex + ".csv";
CSVPrinterWrapper csvPrinterWrapper = new
CSVPrinterWrapper(finalFilePath);
- csvPrinterWrapper.printRecord(headers);
- int i = 0;
- while (i++ < linesPerFile && hasNext) {
- RowRecord rowRecord = sessionDataSet.next();
- if (rowRecord.getTimestamp() != 0) {
- csvPrinterWrapper.print(timeTrans(rowRecord.getTimestamp()));
- }
- rowRecord
- .getFields()
- .forEach(
- field -> {
- String fieldStringValue = field.getStringValue();
- if (!"null".equals(field.getStringValue())) {
- if ((field.getDataType() == TSDataType.TEXT
- || field.getDataType() == TSDataType.STRING)
- && !fieldStringValue.startsWith("root.")) {
- fieldStringValue = "\"" + fieldStringValue + "\"";
- }
- csvPrinterWrapper.print(fieldStringValue);
- } else {
- csvPrinterWrapper.print("");
- }
- });
- csvPrinterWrapper.println();
- hasNext = sessionDataSet.hasNext();
- if (!hasNext) {
- break;
+ try {
+ csvPrinterWrapper.printRecord(headers);
+ fromOuterloop = true;
+ int i = 0;
+ while (i++ < linesPerFile && (fromOuterloop || iterator.next())) {
+ fromOuterloop = false;
+ csvPrinterWrapper.print(timeTrans(iterator.getLong(1)));
+ for (int curColumnIndex = 1; curColumnIndex < totalColumns;
curColumnIndex++) {
+ String columnValue = iterator.getString(curColumnIndex + 1);
+ if (StringUtils.isEmpty(columnValue)) {
+ csvPrinterWrapper.print("");
+ } else {
+ String curType = columnTypeList.get(curColumnIndex);
+ if ((curType.equalsIgnoreCase("TEXT") ||
curType.equalsIgnoreCase("STRING"))
+ && !columnValue.startsWith("root.")) {
+ csvPrinterWrapper.print("\"" + columnValue + "\"");
+ } else {
+ csvPrinterWrapper.print(columnValue);
+ }
+ }
+ }
+ csvPrinterWrapper.println();
+ processedRows += 1;
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval)
{
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
+ fileIndex++;
+ csvPrinterWrapper.flush();
+ } finally {
+ csvPrinterWrapper.close();
}
- csvPrinterWrapper.flush();
- if (!hasNext) {
- break;
- }
- fileIndex++;
}
+ ioTPrinter.print("\n");
}
private static void writeWithTablets(
@@ -317,6 +337,8 @@ public class ExportDataTree extends AbstractExportData {
StatementExecutionException,
IOException,
WriteProcessException {
+ processedRows = 0;
+ lastPrintTime = 0;
while (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();
@@ -337,16 +359,27 @@ public class ExportDataTree extends AbstractExportData {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
writeToTsFile(alignedDevices, tsFileWriter, tablet);
+ processedRows += tablet.getRowSize();
tablet.reset();
}
}
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
+ }
}
for (Tablet tablet : tabletList) {
if (tablet.getRowSize() != 0) {
writeToTsFile(alignedDevices, tsFileWriter, tablet);
+ processedRows += tablet.getRowSize();
+ }
+ if (System.currentTimeMillis() - lastPrintTime > updateTimeInterval) {
+ ioTPrinter.printf(Constants.PROCESSED_PROGRESS, processedRows);
+ lastPrintTime = System.currentTimeMillis();
}
}
+ ioTPrinter.print("\n");
}
private static void writeToTsFile(