This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4af9c59aadb fix: export-tsfile change pull mode (#15306)
4af9c59aadb is described below
commit 4af9c59aadbdd14a2a8951f0fb2b8285c89e8fbf
Author: CritasWang <[email protected]>
AuthorDate: Mon Apr 28 18:10:14 2025 +0800
fix: export-tsfile change pull mode (#15306)
* fix: export-tsfile remove table mode
* fix: export-tsfile remove table mode
* restore table mode
* fix build
* fix build
* restore export-tsfile script
* remove load-tsfile
* temporary removal empty export
---
.../apache/iotdb/tools/it/ExportTsFileTestIT.java | 85 +++++++++++-----------
.../org/apache/iotdb/tool/common/Constants.java | 2 +-
.../org/apache/iotdb/tool/common/OptionsUtil.java | 5 +-
.../org/apache/iotdb/tool/tsfile/ExportTsFile.java | 5 ++
.../subscription/AbstractSubscriptionTsFile.java | 1 +
.../subscription/SubscriptionTableTsFile.java | 26 ++-----
.../subscription/SubscriptionTreeTsFile.java | 29 +++-----
scripts/tools/{load-tsfile.sh => export-tsfile.sh} | 49 ++++++-------
.../windows/{load-tsfile.bat => export-tsfile.bat} | 32 +++++---
9 files changed, 116 insertions(+), 118 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index 399c64ee87c..7579ecf9ee5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -68,35 +68,36 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
public void test() throws IOException {
String os = System.getProperty("os.name").toLowerCase();
if (os.startsWith("windows")) {
- // testOnWindows();
+ testOnWindows();
} else {
- // testOnUnix();
+ testOnUnix();
}
}
@Override
protected void testOnWindows() throws IOException {
- final String[] output = {"Export TsFile Count: 0"};
- ProcessBuilder builder =
- new ProcessBuilder(
- "cmd.exe",
- "/c",
- toolsPath + File.separator + "windows" + File.separator +
"export-tsfile.bat",
- "-h",
- ip,
- "-p",
- port,
- "-u",
- "root",
- "-pw",
- "root",
- "-path",
- "root.test.t2.**",
- "&",
- "exit",
- "%^errorlevel%");
- builder.environment().put("CLASSPATH", libPath);
- testOutput(builder, output, 0);
+ // Test for empty export, temporary removal
+ // final String[] output = {"Export TsFile Count: 0"};
+ // ProcessBuilder builder =
+ // new ProcessBuilder(
+ // "cmd.exe",
+ // "/c",
+ // toolsPath + File.separator + "windows" + File.separator +
"export-tsfile.bat",
+ // "-h",
+ // ip,
+ // "-p",
+ // port,
+ // "-u",
+ // "root",
+ // "-pw",
+ // "root",
+ // "-path",
+ // "root.test.t2.**",
+ // "&",
+ // "exit",
+ // "%^errorlevel%");
+ // builder.environment().put("CLASSPATH", libPath);
+ // testOutput(builder, output, 0);
prepareData();
@@ -125,24 +126,25 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
@Override
protected void testOnUnix() throws IOException {
- final String[] output = {"Export TsFile Count: 0"};
- // -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
- ProcessBuilder builder =
- new ProcessBuilder(
- "bash",
- toolsPath + File.separator + "export-tsfile.sh",
- "-h",
- ip,
- "-p",
- port,
- "-u",
- "root",
- "-pw",
- "root",
- "-path",
- "root.**");
- builder.environment().put("CLASSPATH", libPath);
- testOutput(builder, output, 0);
+ // Test for empty export, temporary removal
+ // final String[] output = {"Export TsFile Count: 0"};
+ // // -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from
root.**"
+ // ProcessBuilder builder =
+ // new ProcessBuilder(
+ // "bash",
+ // toolsPath + File.separator + "export-tsfile.sh",
+ // "-h",
+ // ip,
+ // "-p",
+ // port,
+ // "-u",
+ // "root",
+ // "-pw",
+ // "root",
+ // "-path",
+ // "root.**");
+ // builder.environment().put("CLASSPATH", libPath);
+ // testOutput(builder, output, 0);
prepareData();
@@ -181,6 +183,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
values.add("bbbbb");
values.add("abbes");
session.insertRecord(deviceId, 1L, measurements, values);
+ session.executeNonQueryStatement("flush");
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
index c797bb0d27f..c93308289e6 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
@@ -285,7 +285,7 @@ public class Constants {
public static final String LOOSE_RANGE = "";
public static final boolean STRICT = false;
public static final String MODE = "snapshot";
- public static final boolean AUTO_COMMIT = false;
+ public static final boolean AUTO_COMMIT = true;
public static final String TABLE_MODEL = "table";
public static final long AUTO_COMMIT_INTERVAL = 5000;
public static final long POLL_MESSAGE_TIMEOUT = 10000;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
index b076ed1d114..57ccff5c3e5 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
@@ -968,7 +968,6 @@ public class OptionsUtil extends Constants {
public static Options createSubscriptionTsFileOptions() {
Options options = new Options();
-
Option opSqlDialect =
Option.builder(SQL_DIALECT_ARGS)
.longOpt(SQL_DIALECT_ARGS)
@@ -1036,7 +1035,6 @@ public class OptionsUtil extends Constants {
.desc(TABLE_DESC)
.build();
options.addOption(opTable);
-
Option opStartTime =
Option.builder(START_TIME_ARGS)
.longOpt(START_TIME_ARGS)
@@ -1073,7 +1071,8 @@ public class OptionsUtil extends Constants {
.build();
options.addOption(opThreadNum);
- Option opHelp =
Option.builder(HELP_ARGS).longOpt(HELP_ARGS).hasArg().desc(HELP_DESC).build();
+ Option opHelp =
+
Option.builder(HELP_ARGS).longOpt(HELP_ARGS).hasArg(false).desc(HELP_DESC).build();
options.addOption(opHelp);
return options;
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
index 71794ce244c..dac16a00629 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
@@ -74,10 +74,15 @@ public class ExportTsFile {
private static void parseParams(String[] args, Options options) {
HelpFormatter hf = new HelpFormatter();
+ hf.setOptionComparator(null);
CommandLine cli = null;
CommandLineParser cliParser = new DefaultParser();
try {
cli = cliParser.parse(options, args);
+ if (cli.hasOption(Constants.HELP_ARGS) || args.length == 0) {
+ hf.printHelp(Constants.SUBSCRIPTION_CLI_PREFIX, options, true);
+ System.exit(0);
+ }
if (cli.hasOption(Constants.SQL_DIALECT_ARGS)) {
commonParam.setSqlDialect(cli.getOptionValue(Constants.SQL_DIALECT_ARGS));
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
index 60d38af45f2..aa962044cc2 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
@@ -47,6 +47,7 @@ public abstract class AbstractSubscriptionTsFile {
.build());
commonParam.getTableSubs().open();
} else {
+
commonParam.setSubscriptionTsFile(new SubscriptionTreeTsFile());
commonParam.setTreeSubs(
new SubscriptionTreeSessionBuilder()
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
index 9a5e7f7dc9a..edadbf0a7de 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
@@ -37,7 +37,6 @@ import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -144,31 +143,22 @@ public class SubscriptionTableTsFile extends
AbstractSubscriptionTsFile {
for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size();
i++) {
SubscriptionTablePullConsumer consumer =
(SubscriptionTablePullConsumer) pullTableConsumers.get(i);
- final String consumerGroupId = consumer.getConsumerGroupId();
executor.submit(
new Runnable() {
@Override
public void run() {
- int retryCount = 0;
- while (true) {
+ final String consumerGroupId = consumer.getConsumerGroupId();
+ while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
try {
- List<SubscriptionMessage> messages =
-
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
- consumer.commitSync(messages);
- if (messages.isEmpty()) {
- retryCount++;
- if (retryCount >= Constants.MAX_RETRY_TIMES) {
- break;
- }
- }
- for (final SubscriptionMessage message : messages) {
- SubscriptionTsFileHandler fp = message.getTsFileHandler();
- ioTPrinter.println(fp.getFile().getName());
+ for (final SubscriptionMessage message :
+ consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {
+ final SubscriptionTsFileHandler handler =
message.getTsFileHandler();
+ ioTPrinter.println(handler.getFile().getName());
try {
- fp.moveFile(
+ handler.moveFile(
Paths.get(
commonParam.getTargetDir() + File.separator +
consumerGroupId,
- fp.getPath().getFileName().toString()));
+ handler.getPath().getFileName().toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
index 72a50f1a65d..2128e431d5a 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
@@ -35,7 +35,6 @@ import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -140,32 +139,22 @@ public class SubscriptionTreeTsFile extends
AbstractSubscriptionTsFile {
List<SubscriptionTreePullConsumer> pullTreeConsumers =
commonParam.getPullTreeConsumers();
for (int i = commonParam.getStartIndex(); i < pullTreeConsumers.size();
i++) {
SubscriptionTreePullConsumer consumer =
commonParam.getPullTreeConsumers().get(i);
+ final String consumerGroupId = consumer.getConsumerGroupId();
executor.submit(
new Runnable() {
@Override
public void run() {
- int retryCount = 0;
- while (true) {
+ while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
try {
- List<SubscriptionMessage> messages =
-
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
- consumer.commitSync(messages);
- if (messages.isEmpty()) {
- retryCount++;
- if (retryCount >= Constants.MAX_RETRY_TIMES) {
- break;
- }
- }
- for (final SubscriptionMessage message : messages) {
- SubscriptionTsFileHandler fp = message.getTsFileHandler();
- ioTPrinter.println(fp.getFile().getName());
+ for (final SubscriptionMessage message :
+ consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {
+ final SubscriptionTsFileHandler handler =
message.getTsFileHandler();
+ ioTPrinter.println(handler.getFile().getName());
try {
- fp.moveFile(
+ handler.moveFile(
Paths.get(
- commonParam.getTargetDir()
- + File.separator
- + consumer.getConsumerGroupId(),
- fp.getPath().getFileName().toString()));
+ commonParam.getTargetDir() + File.separator +
consumerGroupId,
+ handler.getPath().getFileName().toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/scripts/tools/load-tsfile.sh b/scripts/tools/export-tsfile.sh
old mode 100755
new mode 100644
similarity index 62%
rename from scripts/tools/load-tsfile.sh
rename to scripts/tools/export-tsfile.sh
index 8424fc2855c..ccc9df3e3a4
--- a/scripts/tools/load-tsfile.sh
+++ b/scripts/tools/export-tsfile.sh
@@ -18,27 +18,20 @@
# under the License.
#
-echo ---------------------
-echo Start Loading TsFile
-echo ---------------------
-
-source "$(dirname "$0")/../conf/iotdb-common.sh"
-#get_iotdb_include and checkAllVariables is in iotdb-common.sh
-VARS=$(get_iotdb_include "$*")
-checkAllVariables
-export IOTDB_HOME="${IOTDB_HOME}"
-eval set -- "$VARS"
-
-PARAMETERS=$@
-
-IOTDB_CLI_CONF=${IOTDB_HOME}/conf
-
-MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
+echo ------------------------------------------
+echo Starting IoTDB Client Export Script
+echo ------------------------------------------
+
+if [ -z "${IOTDB_INCLUDE}" ]; then
+ #do nothing
+ :
+elif [ -r "$IOTDB_INCLUDE" ]; then
+ . "$IOTDB_INCLUDE"
+fi
-CLASSPATH=""
-for f in ${IOTDB_HOME}/lib/*.jar; do
- CLASSPATH=${CLASSPATH}":"$f
-done
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
@@ -51,10 +44,16 @@ else
JAVA=java
fi
-set -o noglob
-iotdb_cli_params="-Dlogback.configurationFile=${IOTDB_CLI_CONF}/logback-tool.xml"
+if [ -z $JAVA ] ; then
+ echo Unable to find java executable. Check JAVA_HOME and PATH environment
variables. > /dev/stderr
+ exit 1;
+fi
+
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
-echo "Starting..."
-exec "$JAVA" $iotdb_cli_params -cp "$CLASSPATH" "$MAIN_CLASS" $PARAMETERS
+MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
-exit $?
+"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
+exit $?
\ No newline at end of file
diff --git a/scripts/tools/windows/load-tsfile.bat
b/scripts/tools/windows/export-tsfile.bat
old mode 100755
new mode 100644
similarity index 65%
rename from scripts/tools/windows/load-tsfile.bat
rename to scripts/tools/windows/export-tsfile.bat
index 8b777cc4503..e67114300e2
--- a/scripts/tools/windows/load-tsfile.bat
+++ b/scripts/tools/windows/export-tsfile.bat
@@ -19,10 +19,11 @@
@echo off
-@REM You can put your env variable here
-@REM set JAVA_HOME=%JAVA_HOME%
+title IoTDB Export
-title IoTDB Load
+echo ````````````````````````````````````````````````
+echo Starting IoTDB Client Export Script
+echo ````````````````````````````````````````````````
if "%OS%" == "Windows_NT" setlocal
@@ -30,7 +31,7 @@ pushd %~dp0..\..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
@@ -38,13 +39,24 @@ if NOT DEFINED JAVA_HOME goto :err
set JAVA_OPTS=-ea^
-DIOTDB_HOME="%IOTDB_HOME%"
-REM For each jar in the IOTDB_HOME lib directory call append to build the
CLASSPATH variable.
-if EXIST %IOTDB_HOME%\lib (set CLASSPATH="%IOTDB_HOME%\lib\*") else set
CLASSPATH="%IOTDB_HOME%\..\lib\*"
+@REM ***** CLASSPATH library setting *****
+set CLASSPATH=%CLASSPATH%;"%IOTDB_HOME%\lib\*"
-set PARAMETERS=%*
+REM
-----------------------------------------------------------------------------
-echo Starting...
-"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp %CLASSPATH% %MAIN_CLASS% %PARAMETERS%
+"%JAVA_HOME%\bin\java" -DIOTDB_HOME="%IOTDB_HOME%" %JAVA_OPTS% -cp %CLASSPATH%
%MAIN_CLASS% %*
set ret_code=%ERRORLEVEL%
+goto finally
-EXIT /B %ret_code%
+
+:err
+echo JAVA_HOME environment variable must be set!
+set ret_code=1
+pause
+
+@REM
-----------------------------------------------------------------------------
+:finally
+
+ENDLOCAL
+
+EXIT /B %ret_code%
\ No newline at end of file