This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new e97cdc615f8 branch-3.1 [fix](Export) change the export logical and
more info in show export(#48022 #48144 #48488 ) (#51937)
e97cdc615f8 is described below
commit e97cdc615f861056e39d807a2c980ee20d28aead
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Jun 19 18:17:47 2025 +0800
branch-3.1 [fix](Export) change the export logical and more info in show
export(#48022 #48144 #48488 ) (#51937)
bp #48022 #48144 #48488
---------
Co-authored-by: Tiewei Fang <[email protected]>
---
be/src/vec/sink/writer/vfile_result_writer.cpp | 78 ++++++++---
be/src/vec/sink/writer/vfile_result_writer.h | 2 +
.../main/java/org/apache/doris/common/Config.java | 5 -
.../org/apache/doris/analysis/OutFileClause.java | 6 +
.../main/java/org/apache/doris/load/ExportJob.java | 142 ++++++++-------------
.../org/apache/doris/load/ExportTaskExecutor.java | 51 ++++----
.../java/org/apache/doris/load/OutfileInfo.java | 6 +
.../org/apache/doris/planner/ResultFileSink.java | 45 +++----
.../java/org/apache/doris/qe/ConnectContext.java | 8 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +-
.../analysis/ExportToOutfileLogicalPlanTest.java | 127 +++++++-----------
.../data/export_p0/outfile/test_outfile_result.out | Bin 0 -> 225 bytes
.../export_p0/outfile/test_outfile_result.groovy | 89 +++++++++++++
13 files changed, 299 insertions(+), 262 deletions(-)
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 5161cf2928d..bb6a54c4693 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -17,6 +17,7 @@
#include "vfile_result_writer.h"
+#include <fmt/format.h>
#include <gen_cpp/Data_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
@@ -39,6 +40,7 @@
#include "io/hdfs_builder.h"
#include "pipeline/exec/result_sink_operator.h"
#include "runtime/buffer_control_block.h"
+#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/large_int_value.h"
@@ -60,6 +62,8 @@
namespace doris::vectorized {
+static double nons_to_second = 1000000000.00;
+
VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs,
std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency>
fin_dep)
@@ -271,19 +275,32 @@ Status VFileResultWriter::_send_result() {
_is_result_sent = true;
// The final stat result include:
- // FileNumber, TotalRows, FileSize and URL
- // The type of these field should be consistent with types defined
- // in OutFileClause.java of FE.
+ // | FileNumber | Int |
+ // | TotalRows | Bigint |
+ // | FileSize | Bigint |
+ // | URL | Varchar |
+ // | WriteTimeSec | Varchar |
+ // | WriteSpeedKB | Varchar |
+ // The type of these field should be consistent with types defined in
OutFileClause.java of FE.
MysqlRowBuffer<> row_buffer;
- row_buffer.push_int(_file_idx); // file number
- row_buffer.push_bigint(_written_rows_counter->value()); // total rows
- row_buffer.push_bigint(_written_data_bytes->value()); // file size
+ row_buffer.push_int(_file_idx); // FileNumber
+ row_buffer.push_bigint(_written_rows_counter->value()); // TotalRows
+ row_buffer.push_bigint(_written_data_bytes->value()); // FileSize
std::string file_url;
_get_file_url(&file_url);
std::stringstream ss;
ss << file_url << "*";
file_url = ss.str();
- row_buffer.push_string(file_url.c_str(), file_url.length()); // url
+ row_buffer.push_string(file_url.c_str(), file_url.length()); // URL
+ double write_time = _file_write_timer->value() / nons_to_second;
+ std::string formatted_write_time = fmt::format("{:.3f}", write_time);
+ row_buffer.push_string(formatted_write_time.c_str(),
+ formatted_write_time.length()); // WriteTimeSec
+
+ double write_speed = _get_write_speed(_written_data_bytes->value(),
_file_write_timer->value());
+ std::string formatted_write_speed = fmt::format("{:.2f}", write_speed);
+ row_buffer.push_string(formatted_write_speed.c_str(),
+ formatted_write_speed.length()); // WriteSpeedKB
std::unique_ptr<TFetchDataResult> result =
std::make_unique<TFetchDataResult>();
result->result_batch.rows.resize(1);
@@ -295,6 +312,8 @@ Status VFileResultWriter::_send_result() {
std::make_pair("TotalRows",
std::to_string(_written_rows_counter->value())));
attach_infos.insert(std::make_pair("FileSize",
std::to_string(_written_data_bytes->value())));
attach_infos.insert(std::make_pair("URL", file_url));
+ attach_infos.insert(std::make_pair("WriteTimeSec", formatted_write_time));
+ attach_infos.insert(std::make_pair("WriteSpeedKB", formatted_write_speed));
result->result_batch.__set_attached_infos(attach_infos);
RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
@@ -309,20 +328,29 @@ Status VFileResultWriter::_fill_result_block() {
_is_result_sent = true;
#ifndef INSERT_TO_COLUMN
-#define INSERT_TO_COLUMN
\
- if (i == 0) {
\
- column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);
\
- } else if (i == 1) {
\
- int64_t written_rows = _written_rows_counter->value();
\
- column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);
\
- } else if (i == 2) {
\
- int64_t written_data_bytes = _written_data_bytes->value();
\
- column->insert_data(reinterpret_cast<const
char*>(&written_data_bytes), 0); \
- } else if (i == 3) {
\
- std::string file_url;
\
- static_cast<void>(_get_file_url(&file_url));
\
- column->insert_data(file_url.c_str(), file_url.size());
\
- }
\
+#define INSERT_TO_COLUMN
\
+ if (i == 0) {
\
+ column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);
\
+ } else if (i == 1) {
\
+ int64_t written_rows = _written_rows_counter->value();
\
+ column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);
\
+ } else if (i == 2) {
\
+ int64_t written_data_bytes = _written_data_bytes->value();
\
+ column->insert_data(reinterpret_cast<const
char*>(&written_data_bytes), 0); \
+ } else if (i == 3) {
\
+ std::string file_url;
\
+ static_cast<void>(_get_file_url(&file_url));
\
+ column->insert_data(file_url.c_str(), file_url.size());
\
+ } else if (i == 4) {
\
+ double write_time = _file_write_timer->value() / nons_to_second;
\
+ std::string formatted_write_time = fmt::format("{:.3f}", write_time);
\
+ column->insert_data(formatted_write_time.c_str(),
formatted_write_time.size()); \
+ } else if (i == 5) {
\
+ double write_speed =
\
+ _get_write_speed(_written_data_bytes->value(),
_file_write_timer->value()); \
+ std::string formatted_write_speed = fmt::format("{:.2f}",
write_speed); \
+ column->insert_data(formatted_write_speed.c_str(),
formatted_write_speed.size()); \
+ }
\
_output_block->replace_by_position(i, std::move(column));
#endif
@@ -387,6 +415,14 @@ Status VFileResultWriter::_delete_dir() {
}
}
+double VFileResultWriter::_get_write_speed(int64_t write_bytes, int64_t
write_time) {
+ if (write_time <= 0) {
+ return 0;
+ }
+ // KB / s
+ return ((write_bytes * nons_to_second) / (write_time)) / 1024;
+}
+
Status VFileResultWriter::close(Status exec_status) {
Status st = exec_status;
if (st.ok()) {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index bf0a5d3e9e2..8b611d7ceef 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -21,6 +21,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <cstdint>
#include <iosfwd>
#include <memory>
#include <string>
@@ -99,6 +100,7 @@ private:
Status _fill_result_block();
// delete the dir of file_path
Status _delete_dir();
+ double _get_write_speed(int64_t write_bytes, int64_t write_time);
RuntimeState* _state; // not owned, set when init
const pipeline::ResultFileOptions* _file_opts = nullptr;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6925a2d59c1..e6eaf35b3ab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2571,11 +2571,6 @@ public class Config extends ConfigBase {
"The maximum parallelism allowed by Export job"})
public static int maximum_parallelism_of_export_job = 50;
- @ConfField(mutable = true, description = {
- "ExportExecutorTask任务中一个OutFile语句允许的最大tablets数量",
- "The maximum number of tablets allowed by an OutfileStatement in
an ExportExecutorTask"})
- public static int maximum_tablets_of_outfile_in_export = 10;
-
@ConfField(mutable = true, description = {
"是否用 mysql 的 bigint 类型来返回 Doris 的 largeint 类型",
"Whether to use mysql's bigint type to return Doris's largeint
type"})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index d4a7b25ed5a..adf601da1ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -79,17 +79,23 @@ public class OutFileClause {
public static final String TOTAL_ROWS = "TotalRows";
public static final String FILE_SIZE = "FileSize";
public static final String URL = "URL";
+ public static final String WRITE_TIME_SEC = "WriteTimeSec";
+ public static final String WRITE_SPEED_KB = "WriteSpeedKB";
static {
RESULT_COL_NAMES.add(FILE_NUMBER);
RESULT_COL_NAMES.add(TOTAL_ROWS);
RESULT_COL_NAMES.add(FILE_SIZE);
RESULT_COL_NAMES.add(URL);
+ RESULT_COL_NAMES.add(WRITE_TIME_SEC);
+ RESULT_COL_NAMES.add(WRITE_SPEED_KB);
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT));
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
+ RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
+ RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
PARQUET_REPETITION_TYPE_MAP.put("required",
TParquetRepetitionType.REQUIRED);
PARQUET_REPETITION_TYPE_MAP.put("repeated",
TParquetRepetitionType.REPEATED);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index ecde36f7fd5..b9e1ca2ead3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -106,8 +106,6 @@ public class ExportJob implements Writable {
private static final String BROKER_PROPERTY_PREFIXES = "broker.";
- private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT =
Config.maximum_tablets_of_outfile_in_export;
-
public static final String CONSISTENT_NONE = "none";
public static final String CONSISTENT_PARTITION = "partition";
@@ -189,10 +187,9 @@ public class ExportJob implements Writable {
/**
* Each parallel has an associated Outfile list
* which are organized into a two-dimensional list.
- * Therefore, we can access the selectStmtListPerParallel
* to get the outfile logical plans list responsible for each parallel
task.
*/
- private List<List<StatementBase>> selectStmtListPerParallel =
Lists.newArrayList();
+ private List<Optional<StatementBase>> selectStmtPerParallel =
Lists.newArrayList();
private List<String> exportColumns = Lists.newArrayList();
@@ -255,8 +252,6 @@ public class ExportJob implements Writable {
* according to the 'parallelism' set by the user.
* The tablets which will be exported by this ExportJob are divided into
'parallelism' copies,
* and each ExportTaskExecutor is responsible for a list of tablets.
- * The tablets responsible for an ExportTaskExecutor will be assigned to
multiple OutfileStmt
- * according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
*
* @throws UserException
*/
@@ -282,14 +277,10 @@ public class ExportJob implements Writable {
// debug LOG output
if (LOG.isDebugEnabled()) {
- for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
+ for (int i = 0; i < selectStmtPerParallel.size(); ++i) {
if (LOG.isDebugEnabled()) {
LOG.debug("ExportTaskExecutor {} is responsible for
outfile:", i);
- }
- for (StatementBase outfile :
selectStmtListPerParallel.get(i)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("outfile sql: [{}]", outfile.toSql());
- }
+ LOG.debug("outfile sql: [{}]",
selectStmtPerParallel.get(i).get().toSql());
}
}
}
@@ -312,21 +303,16 @@ public class ExportJob implements Writable {
}
// get all tablets
- List<List<List<Long>>> tabletsListPerParallel = splitTablets();
+ List<List<Long>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
- for (List<List<Long>> tabletsList : tabletsListPerParallel) {
- List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
- for (List<Long> tabletIds : tabletsList) {
- // generate LogicalPlan
- LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName,
tabletIds,
- this.partitionNames, selectLists);
- // generate LogicalPlanAdapter
- StatementBase statementBase = generateLogicalPlanAdapter(plan);
-
- logicalPlanAdapters.add(statementBase);
- }
- selectStmtListPerParallel.add(logicalPlanAdapters);
+ for (List<Long> tabletsList : tabletsListPerParallel) {
+ // generate LogicalPlan
+ LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName,
tabletsList,
+ this.partitionNames, selectLists);
+ // generate LogicalPlanAdapter
+ StatementBase statementBase = generateLogicalPlanAdapter(plan);
+ selectStmtPerParallel.add(Optional.of(statementBase));
}
}
@@ -352,16 +338,13 @@ public class ExportJob implements Writable {
});
}
- List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
-
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName,
ImmutableList.of(),
ImmutableList.of(), selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
- logicalPlanAdapters.add(statementBase);
- selectStmtListPerParallel.add(logicalPlanAdapters);
+ selectStmtPerParallel.add(Optional.of(statementBase));
}
private LogicalPlan generateOneLogicalPlan(List<String>
qualifiedTableName, List<Long> tabletIds,
@@ -401,15 +384,15 @@ public class ExportJob implements Writable {
private void generateExportJobExecutor() {
jobExecutorList = Lists.newArrayList();
- for (List<StatementBase> selectStmts : selectStmtListPerParallel) {
- ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts,
this);
+ for (Optional<StatementBase> selectStmt : selectStmtPerParallel) {
+ ExportTaskExecutor executor = new ExportTaskExecutor(selectStmt,
this);
jobExecutorList.add(executor);
}
// add empty task to make export job could be finished finally if
jobExecutorList is empty
// which means that export table without data
if (jobExecutorList.isEmpty()) {
- ExportTaskExecutor executor = new
ExportTaskExecutor(Lists.newArrayList(), this);
+ ExportTaskExecutor executor = new
ExportTaskExecutor(Optional.empty(), this);
jobExecutorList.add(executor);
}
}
@@ -433,77 +416,59 @@ public class ExportJob implements Writable {
}
}
- List<List<TableRef>> tableRefListPerParallel =
getTableRefListPerParallel();
- LOG.info("Export Job [{}] is split into {} Export Task Executor.", id,
tableRefListPerParallel.size());
+ List<TableRef> tableRefPerParallel = getTableRefListPerParallel();
+ LOG.info("Export Job [{}] is split into {} Export Task Executor.", id,
tableRefPerParallel.size());
// debug LOG output
if (LOG.isDebugEnabled()) {
- for (int i = 0; i < tableRefListPerParallel.size(); i++) {
+ for (int i = 0; i < tableRefPerParallel.size(); i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("ExportTaskExecutor {} is responsible for
tablets:", i);
- }
- for (TableRef tableRef : tableRefListPerParallel.get(i)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Tablet id: [{}]",
tableRef.getSampleTabletIds());
- }
+ LOG.debug("Tablet id: [{}]",
tableRefPerParallel.get(i).getSampleTabletIds());
}
}
}
// generate 'select..outfile..' statement
- for (List<TableRef> tableRefList : tableRefListPerParallel) {
- List<StatementBase> selectStmtLists = Lists.newArrayList();
- for (TableRef tableRef : tableRefList) {
- List<TableRef> tmpTableRefList = Lists.newArrayList(tableRef);
- FromClause fromClause = new FromClause(tmpTableRefList);
- // generate outfile clause
- OutFileClause outfile = new OutFileClause(this.exportPath,
this.format, convertOutfileProperties());
- SelectStmt selectStmt = new SelectStmt(list, fromClause,
this.whereExpr, null,
- null, null, LimitElement.NO_LIMIT);
- selectStmt.setOutFileClause(outfile);
- selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(),
0));
- selectStmtLists.add(selectStmt);
- }
- selectStmtListPerParallel.add(selectStmtLists);
+ for (TableRef tableReferences : tableRefPerParallel) {
+ FromClause fromClause = new
FromClause(Lists.newArrayList(tableReferences));
+ // generate outfile clause
+ OutFileClause outfile = new OutFileClause(this.exportPath,
this.format, convertOutfileProperties());
+ SelectStmt selectStmt = new SelectStmt(list, fromClause,
this.whereExpr, null,
+ null, null, LimitElement.NO_LIMIT);
+ selectStmt.setOutFileClause(outfile);
+ selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+ selectStmtPerParallel.add(Optional.of(selectStmt));
}
// debug LOG output
if (LOG.isDebugEnabled()) {
- for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
+ for (int i = 0; i < selectStmtPerParallel.size(); ++i) {
if (LOG.isDebugEnabled()) {
LOG.debug("ExportTaskExecutor {} is responsible for
outfile:", i);
- }
- for (StatementBase outfile : selectStmtListPerParallel.get(i))
{
- if (LOG.isDebugEnabled()) {
- LOG.debug("outfile sql: [{}]", outfile.toSql());
- }
+ LOG.debug("outfile sql: [{}]",
selectStmtPerParallel.get(i).get().toSql());
}
}
}
}
- private List<List<TableRef>> getTableRefListPerParallel() throws
UserException {
- List<List<List<Long>>> tabletsListPerParallel = splitTablets();
-
- List<List<TableRef>> tableRefListPerParallel = Lists.newArrayList();
- for (List<List<Long>> tabletsList : tabletsListPerParallel) {
- List<TableRef> tableRefList = Lists.newArrayList();
- for (List<Long> tablets : tabletsList) {
- // Since export does not support the alias, here we pass the
null value.
- // we can not use this.tableRef.getAlias(),
- // because the constructor of `Tableref` will convert
this.tableRef.getAlias()
- // into lower case when lower_case_table_names = 1
- TableRef tblRef = new TableRef(this.tableRef.getName(), null,
- this.tableRef.getPartitionNames(), (ArrayList) tablets,
- this.tableRef.getTableSample(),
this.tableRef.getCommonHints());
- tableRefList.add(tblRef);
- }
- tableRefListPerParallel.add(tableRefList);
+ private List<TableRef> getTableRefListPerParallel() throws UserException {
+ List<List<Long>> tabletsListPerParallel = splitTablets();
+ List<TableRef> tableRefPerParallel = Lists.newArrayList();
+ for (List<Long> tabletsList : tabletsListPerParallel) {
+ // Since export does not support the alias, here we pass the null
value.
+ // we can not use this.tableRef.getAlias(),
+ // because the constructor of `Tableref` will convert
this.tableRef.getAlias()
+ // into lower case when lower_case_table_names = 1
+ TableRef tblRef = new TableRef(this.tableRef.getName(), null,
+ this.tableRef.getPartitionNames(), (ArrayList) tabletsList,
+ this.tableRef.getTableSample(),
this.tableRef.getCommonHints());
+ tableRefPerParallel.add(tblRef);
}
- return tableRefListPerParallel;
+ return tableRefPerParallel;
}
- private List<List<List<Long>>> splitTablets() throws UserException {
+ private List<List<Long>> splitTablets() throws UserException {
// get tablets
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
OlapTable table =
db.getOlapTableOrAnalysisException(this.tableName.getTbl());
@@ -560,7 +525,7 @@ public class ExportJob implements Writable {
+ "set parallelism to partition num.", id,
totalPartitions, this.parallelism);
}
int start = 0;
- List<List<List<Long>>> tabletsListPerParallel = new ArrayList<>();
+ List<List<Long>> tabletsListPerParallel = new ArrayList<>();
for (int i = 0; i < realParallelism; ++i) {
int partitionNum = numPerParallel;
if (numPerQueryRemainder > 0) {
@@ -568,8 +533,9 @@ public class ExportJob implements Writable {
--numPerQueryRemainder;
}
List<List<Long>> tablets = new
ArrayList<>(tabletIdList.subList(start, start + partitionNum));
+ List<Long> flatTablets =
tablets.stream().flatMap(List::stream).collect(Collectors.toList());
start += partitionNum;
- tabletsListPerParallel.add(tablets);
+ tabletsListPerParallel.add(flatTablets);
}
return tabletsListPerParallel;
}
@@ -583,7 +549,7 @@ public class ExportJob implements Writable {
Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
Integer tabletsNumPerQueryRemainder = tabletsAllNum -
tabletsNumPerParallel * this.parallelism;
- List<List<List<Long>>> tabletsListPerParallel = Lists.newArrayList();
+ List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
Integer realParallelism = this.parallelism;
if (tabletsAllNum < this.parallelism) {
realParallelism = tabletsAllNum;
@@ -599,14 +565,8 @@ public class ExportJob implements Writable {
--tabletsNumPerQueryRemainder;
}
List<Long> tabletsList = new
ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
- List<List<Long>> tablets = new ArrayList<>();
- for (int i = 0; i < tabletsList.size(); i +=
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
- int end = Math.min(i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT,
tabletsList.size());
- tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
- }
-
start += tabletsNum;
- tabletsListPerParallel.add(tablets);
+ tabletsListPerParallel.add(tabletsList);
}
return tabletsListPerParallel;
}
@@ -702,7 +662,7 @@ public class ExportJob implements Writable {
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
jobExecutorList.clear();
- selectStmtListPerParallel.clear();
+ selectStmtPerParallel.clear();
allOutfileInfo.clear();
partitionToVersion.clear();
if (FeConstants.runningUnitTest) {
@@ -755,7 +715,7 @@ public class ExportJob implements Writable {
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
// Clear the jobExecutorList to release memory.
jobExecutorList.clear();
- selectStmtListPerParallel.clear();
+ selectStmtPerParallel.clear();
allOutfileInfo.clear();
partitionToVersion.clear();
Env.getCurrentEnv().getEditLog().logExportUpdateState(this,
ExportJobState.FINISHED);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index ae84068b20d..3d321cfc9fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ExportTaskExecutor implements TransientTaskExecutor {
private static final Logger LOG =
LogManager.getLogger(ExportTaskExecutor.class);
- List<StatementBase> selectStmtLists;
+ Optional<StatementBase> selectStmt;
ExportJob exportJob;
@@ -67,9 +67,9 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
private AtomicBoolean isFinished;
- ExportTaskExecutor(List<StatementBase> selectStmtLists, ExportJob
exportJob) {
+ ExportTaskExecutor(Optional<StatementBase> selectStmt, ExportJob
exportJob) {
this.taskId = UUID.randomUUID().getMostSignificantBits();
- this.selectStmtLists = selectStmtLists;
+ this.selectStmt = selectStmt;
this.exportJob = exportJob;
this.isCanceled = new AtomicBoolean(false);
this.isFinished = new AtomicBoolean(false);
@@ -90,16 +90,14 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
LOG.debug("[Export Task] taskId: {} updating state to EXPORTING",
taskId);
exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null,
null, null);
List<OutfileInfo> outfileInfoList = Lists.newArrayList();
- for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
- LOG.debug("[Export Task] taskId: {} processing statement {}/{}",
- taskId, idx + 1, selectStmtLists.size());
+ if (selectStmt.isPresent()) {
if (isCanceled.get()) {
- LOG.debug("[Export Task] taskId: {} canceled during execution
at statement {}", taskId, idx + 1);
+ LOG.debug("[Export Task] taskId: {} canceled during
execution", taskId);
throw new JobException("Export executor has been canceled,
task id: {}", taskId);
}
// check the version of tablets, skip if the consistency is in
partition level.
if (exportJob.getExportTable().isManagedTable() &&
!exportJob.isPartitionConsistency()) {
- LOG.debug("[Export Task] taskId: {} checking tablet versions
for statement {}", taskId, idx + 1);
+ LOG.debug("[Export Task] taskId: {} checking tablet versions",
taskId);
try {
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
@@ -110,7 +108,7 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
LOG.debug("[Export Lock] taskId: {}, table: {} acquired
readLock", taskId, table.getName());
try {
List<Long> tabletIds;
- LogicalPlanAdapter logicalPlanAdapter =
(LogicalPlanAdapter) selectStmtLists.get(idx);
+ LogicalPlanAdapter logicalPlanAdapter =
(LogicalPlanAdapter) selectStmt.get();
Optional<UnboundRelation> unboundRelation =
findUnboundRelation(
logicalPlanAdapter.getLogicalPlan());
tabletIds = unboundRelation.get().getTabletIds();
@@ -151,8 +149,8 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
}
try (AutoCloseConnectContext r = buildConnectContext()) {
- LOG.debug("[Export Task] taskId: {} executing statement {}",
taskId, idx + 1);
- stmtExecutor = new StmtExecutor(r.connectContext,
selectStmtLists.get(idx));
+ LOG.debug("[Export Task] taskId: {} executing", taskId);
+ stmtExecutor = new StmtExecutor(r.connectContext,
selectStmt.get());
stmtExecutor.execute();
if (r.connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
LOG.debug("[Export Task] taskId: {} failed with MySQL
error: {}", taskId,
@@ -161,16 +159,11 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
ExportFailMsg.CancelType.RUN_FAIL,
r.connectContext.getState().getErrorMessage());
return;
}
- LOG.debug("[Export Task] taskId: {} statement {} executed
successfully", taskId, idx + 1);
- OutfileInfo outfileInfo =
getOutFileInfo(r.connectContext.getResultAttachedInfo());
- LOG.debug("[Export Task] taskId: {} got outfile info for
statement {}:"
- + "fileNumber={}, totalRows={}, fileSize={}",
- taskId, idx + 1, outfileInfo.getFileNumber(),
- outfileInfo.getTotalRows(), outfileInfo.getFileSize());
- outfileInfoList.add(outfileInfo);
+ LOG.debug("[Export Task] taskId: {} executed successfully",
taskId);
+ outfileInfoList =
getOutFileInfo(r.connectContext.getResultAttachedInfo());
} catch (Exception e) {
- LOG.debug("[Export Task] taskId: {} failed with exception
during statement {}: {}",
- taskId, idx + 1, e.getMessage(), e);
+ LOG.debug("[Export Task] taskId: {} failed with exception: {}",
+ taskId, e.getMessage(), e);
exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
@@ -214,12 +207,18 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
return new AutoCloseConnectContext(connectContext);
}
- private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo)
{
- OutfileInfo outfileInfo = new OutfileInfo();
-
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
-
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
-
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE));
- outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
+ private List<OutfileInfo> getOutFileInfo(List<Map<String, String>>
resultAttachedInfo) {
+ List<OutfileInfo> outfileInfo = Lists.newArrayList();
+ for (Map<String, String> row : resultAttachedInfo) {
+ OutfileInfo outfileInfoOneRow = new OutfileInfo();
+
outfileInfoOneRow.setFileNumber(row.get(OutFileClause.FILE_NUMBER));
+ outfileInfoOneRow.setTotalRows(row.get(OutFileClause.TOTAL_ROWS));
+ outfileInfoOneRow.setFileSize(row.get(OutFileClause.FILE_SIZE));
+ outfileInfoOneRow.setUrl(row.get(OutFileClause.URL));
+
outfileInfoOneRow.setWriteTime(row.get(OutFileClause.WRITE_TIME_SEC));
+
outfileInfoOneRow.setWriteSpeed(row.get(OutFileClause.WRITE_SPEED_KB));
+ outfileInfo.add(outfileInfoOneRow);
+ }
return outfileInfo;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
index b9befd9d326..262bf98adef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
@@ -34,4 +34,10 @@ public class OutfileInfo {
@SerializedName("url")
private String url;
+
+ @SerializedName("writeTime")
+ private String writeTime;
+
+ @SerializedName("writeSpeed")
+ private String writeSpeed;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 631339c3732..3dbd5bc2115 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -24,8 +24,6 @@ import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
@@ -145,39 +143,26 @@ public class ResultFileSink extends DataSink {
/**
* Construct a tuple for file status, the tuple schema as following:
- * | FileNumber | Int |
- * | TotalRows | Bigint |
- * | FileSize | Bigint |
- * | URL | Varchar |
+ * | FileNumber | Int |
+ * | TotalRows | Bigint |
+ * | FileSize | Bigint |
+ * | URL | Varchar |
+ * | WriteTimeSec | Varchar |
+ * | WriteSpeedKB | Varchar |
*/
public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable
descriptorTable) {
TupleDescriptor resultFileStatusTupleDesc =
descriptorTable.createTupleDescriptor("result_file_status");
resultFileStatusTupleDesc.setIsMaterialized(true);
- SlotDescriptor fileNumber =
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
- fileNumber.setLabel("FileNumber");
- fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
- fileNumber.setColumn(new Column("FileNumber",
ScalarType.createType(PrimitiveType.INT)));
- fileNumber.setIsMaterialized(true);
- fileNumber.setIsNullable(false);
- SlotDescriptor totalRows =
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
- totalRows.setLabel("TotalRows");
- totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
- totalRows.setColumn(new Column("TotalRows",
ScalarType.createType(PrimitiveType.BIGINT)));
- totalRows.setIsMaterialized(true);
- totalRows.setIsNullable(false);
- SlotDescriptor fileSize =
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
- fileSize.setLabel("FileSize");
- fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
- fileSize.setColumn(new Column("FileSize",
ScalarType.createType(PrimitiveType.BIGINT)));
- fileSize.setIsMaterialized(true);
- fileSize.setIsNullable(false);
- SlotDescriptor url =
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
- url.setLabel("URL");
- url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
- url.setColumn(new Column("URL",
ScalarType.createType(PrimitiveType.VARCHAR)));
- url.setIsMaterialized(true);
- url.setIsNullable(false);
+ for (int i = 0; i < OutFileClause.RESULT_COL_NAMES.size(); ++i) {
+ SlotDescriptor slotDescriptor =
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
+ slotDescriptor.setLabel(OutFileClause.RESULT_COL_NAMES.get(i));
+ slotDescriptor.setType(OutFileClause.RESULT_COL_TYPES.get(i));
+ slotDescriptor.setColumn(new
Column(OutFileClause.RESULT_COL_NAMES.get(i),
+ OutFileClause.RESULT_COL_TYPES.get(i)));
+ slotDescriptor.setIsMaterialized(true);
+ slotDescriptor.setIsNullable(false);
+ }
resultFileStatusTupleDesc.computeStatAndMemLayout();
return resultFileStatusTupleDesc;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 54de7ed1339..c5782466e88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -229,7 +229,7 @@ public class ConnectContext {
private StatsErrorEstimator statsErrorEstimator;
- private Map<String, String> resultAttachedInfo = Maps.newHashMap();
+ private List<Map<String, String>> resultAttachedInfo =
Lists.newArrayList();
private String workloadGroupName = "";
private boolean isGroupCommit;
@@ -1098,11 +1098,11 @@ public class ConnectContext {
return env.getAuth().getExecMemLimit(getQualifiedUser());
}
- public void setResultAttachedInfo(Map<String, String> resultAttachedInfo) {
- this.resultAttachedInfo = resultAttachedInfo;
+ public void addResultAttachedInfo(Map<String, String> resultAttachedInfo) {
+ this.resultAttachedInfo.add(resultAttachedInfo);
}
- public Map<String, String> getResultAttachedInfo() {
+ public List<Map<String, String>> getResultAttachedInfo() {
return resultAttachedInfo;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 75553377563..139abb74597 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2003,7 +2003,7 @@ public class StmtExecutor {
}
profile.getSummaryProfile().freshWriteResultConsumeTime();
context.updateReturnRows(batch.getBatch().getRows().size());
-
context.setResultAttachedInfo(batch.getBatch().getAttachedInfos());
+
context.addResultAttachedInfo(batch.getBatch().getAttachedInfos());
}
if (batch.isEos()) {
break;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
index 096868276fd..7605b772c57 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
@@ -39,6 +39,7 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* The `Export` sql finally generates the `Outfile` sql.
@@ -93,31 +94,18 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
+ ");";
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L,
10016L, 10018L, 10020L, 10022L, 10024L,
- 10026L, 10028L);
- List<Long> currentTablets2 = Arrays.asList(10030L, 10032L, 10034L,
10036L, 10038L, 10040L, 10042L, 10044L,
- 10046L, 10048L);
- List<Long> currentTablets3 = Arrays.asList(10050L, 10052L, 10054L,
10056L, 10058L, 10060L, 10062L, 10064L,
- 10066L, 10068L);
- List<Long> currentTablets4 = Arrays.asList(10070L, 10072L, 10074L,
10076L, 10078L, 10080L, 10082L, 10084L,
+ 10026L, 10028L, 10030L, 10032L, 10034L, 10036L, 10038L,
10040L, 10042L, 10044L,
+ 10046L, 10048L, 10050L, 10052L, 10054L, 10056L, 10058L,
10060L, 10062L, 10064L,
+ 10066L, 10068L, 10070L, 10072L, 10074L, 10076L, 10078L,
10080L, 10082L, 10084L,
10086L, 10088L);
// generate outfile
- List<List<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
+ List<Optional<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(1, outfileSqlPerParallel.size());
- Assert.assertEquals(4, outfileSqlPerParallel.get(0).size());
- LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+ LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false),
Lists.newArrayList(), currentTablets1);
-
- LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(1)).getLogicalPlan();
- checkPartitionsAndTablets(getUnboundRelation(plan2, false),
Lists.newArrayList(), currentTablets2);
-
- LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(2)).getLogicalPlan();
- checkPartitionsAndTablets(getUnboundRelation(plan3, false),
Lists.newArrayList(), currentTablets3);
-
- LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(3)).getLogicalPlan();
- checkPartitionsAndTablets(getUnboundRelation(plan4, false),
Lists.newArrayList(), currentTablets4);
}
/**
@@ -153,25 +141,21 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
10086L, 10088L);
// generate outfile
- List<List<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
+ List<Optional<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(4, outfileSqlPerParallel.size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
- LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+ LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false),
Lists.newArrayList(), currentTablets1);
- LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+ LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false),
Lists.newArrayList(), currentTablets2);
- LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+ LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false),
Lists.newArrayList(), currentTablets3);
- LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+ LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false),
Lists.newArrayList(), currentTablets4);
}
@@ -199,41 +183,27 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
// This export sql should generate 4 array, and there should be 1
outfile sql in per array.
// The only difference between them is the TABLET(). They are:
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L,
10016L, 10018L, 10020L, 10022L, 10024L,
- 10026L, 10028L);
- List<Long> currentTablets12 = Arrays.asList(10030L, 10032L, 10034L,
10036L);
+ 10026L, 10028L, 10030L, 10032L, 10034L, 10036L);
List<Long> currentTablets2 = Arrays.asList(10038L, 10040L, 10042L,
10044L, 10046L, 10048L, 10050L, 10052L,
- 10054L, 10056L);
- List<Long> currentTablets22 = Arrays.asList(10058L, 10060L, 10062L);
+ 10054L, 10056L, 10058L, 10060L, 10062L);
List<Long> currentTablets3 = Arrays.asList(10064L, 10066L, 10068L,
10070L, 10072L, 10074L, 10076L, 10078L,
- 10080L, 10082L);
- List<Long> currentTablets32 = Arrays.asList(10084L, 10086L, 10088L);
+ 10080L, 10082L, 10084L, 10086L, 10088L);
// generate outfile
- List<List<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
+ List<Optional<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(3, outfileSqlPerParallel.size());
- Assert.assertEquals(2, outfileSqlPerParallel.get(0).size());
- Assert.assertEquals(2, outfileSqlPerParallel.get(1).size());
- Assert.assertEquals(2, outfileSqlPerParallel.get(2).size());
- LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+ LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false),
Lists.newArrayList(), currentTablets1);
- LogicalPlan plan12 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(1)).getLogicalPlan();
- checkPartitionsAndTablets(getUnboundRelation(plan12, false),
Lists.newArrayList(), currentTablets12);
-
- LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+ LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false),
Lists.newArrayList(), currentTablets2);
- LogicalPlan plan22 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get(1)).getLogicalPlan();
- checkPartitionsAndTablets(getUnboundRelation(plan22, false),
Lists.newArrayList(), currentTablets22);
- LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+ LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false),
Lists.newArrayList(), currentTablets3);
-
- LogicalPlan plan32 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get(1)).getLogicalPlan();
- checkPartitionsAndTablets(getUnboundRelation(plan32, false),
Lists.newArrayList(), currentTablets32);
}
/**
@@ -267,26 +237,22 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
List<String> currentPartitions = Arrays.asList("p1");
// generate outfile
- List<List<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
+ List<Optional<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(4, outfileSqlPerParallel.size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
- LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+ LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false),
currentPartitions, currentTablets1);
- LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+ LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false),
currentPartitions, currentTablets2);
- LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+ LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false),
currentPartitions, currentTablets3);
- LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+ LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false),
currentPartitions, currentTablets4);
}
@@ -320,25 +286,21 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
List<String> currentPartitions = Arrays.asList("p1", "p4");
// generate outfile
- List<List<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
+ List<Optional<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(4, outfileSqlPerParallel.size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
- Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
- LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+ LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false),
currentPartitions, currentTablets1);
- LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+ LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false),
currentPartitions, currentTablets2);
- LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+ LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false),
currentPartitions, currentTablets3);
- LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+ LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false),
currentPartitions, currentTablets4);
}
@@ -380,42 +342,39 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
List<String> currentPartitions = Arrays.asList("p1");
// generate outfile
- List<List<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
+ List<Optional<StatementBase>> outfileSqlPerParallel =
getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(10, outfileSqlPerParallel.size());
- for (int i = 0; i < 10; ++i) {
- Assert.assertEquals(1, outfileSqlPerParallel.get(i).size());
- }
- LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+ LogicalPlan plan1 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false),
currentPartitions, currentTablets1);
- LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+ LogicalPlan plan2 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false),
currentPartitions, currentTablets2);
- LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+ LogicalPlan plan3 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false),
currentPartitions, currentTablets3);
- LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+ LogicalPlan plan4 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false),
currentPartitions, currentTablets4);
- LogicalPlan plan5 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(4).get(0)).getLogicalPlan();
+ LogicalPlan plan5 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(4).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan5, false),
currentPartitions, currentTablets5);
- LogicalPlan plan6 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(5).get(0)).getLogicalPlan();
+ LogicalPlan plan6 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(5).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan6, false),
currentPartitions, currentTablets6);
- LogicalPlan plan7 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(6).get(0)).getLogicalPlan();
+ LogicalPlan plan7 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(6).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan7, false),
currentPartitions, currentTablets7);
- LogicalPlan plan8 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(7).get(0)).getLogicalPlan();
+ LogicalPlan plan8 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(7).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan8, false),
currentPartitions, currentTablets8);
- LogicalPlan plan9 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(8).get(0)).getLogicalPlan();
+ LogicalPlan plan9 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(8).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan9, false),
currentPartitions, currentTablets9);
- LogicalPlan plan10 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(9).get(0)).getLogicalPlan();
+ LogicalPlan plan10 = ((LogicalPlanAdapter)
outfileSqlPerParallel.get(9).get()).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan10, false),
currentPartitions, currentTablets10);
}
@@ -425,9 +384,9 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
}
// need open EnableNereidsPlanner
- private List<List<StatementBase>> getOutfileSqlPerParallel(String
exportSql) throws UserException {
+ private List<Optional<StatementBase>> getOutfileSqlPerParallel(String
exportSql) throws UserException {
ExportCommand exportCommand = (ExportCommand) parseSql(exportSql);
- List<List<StatementBase>> selectStmtListPerParallel =
Lists.newArrayList();
+ List<Optional<StatementBase>> selectStmtPerParallel =
Lists.newArrayList();
try {
Method checkAllParameters =
exportCommand.getClass().getDeclaredMethod("checkAllParameters",
ConnectContext.class, TableName.class, Map.class);
@@ -445,7 +404,7 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
ExportJob job = (ExportJob) generateExportJob.invoke(
exportCommand, connectContext,
exportCommand.getFileProperties(), tblName);
- selectStmtListPerParallel = job.getSelectStmtListPerParallel();
+ selectStmtPerParallel = job.getSelectStmtPerParallel();
} catch (NoSuchMethodException e) {
throw new UserException(e);
} catch (InvocationTargetException e) {
@@ -453,7 +412,7 @@ public class ExportToOutfileLogicalPlanTest extends
TestWithFeService {
} catch (IllegalAccessException e) {
throw new UserException(e);
}
- return selectStmtListPerParallel;
+ return selectStmtPerParallel;
}
private void checkPartitionsAndTablets(UnboundRelation relation,
List<String> currentPartitionNames,
diff --git a/regression-test/data/export_p0/outfile/test_outfile_result.out
b/regression-test/data/export_p0/outfile/test_outfile_result.out
new file mode 100644
index 00000000000..cda7d5ce1e6
Binary files /dev/null and
b/regression-test/data/export_p0/outfile/test_outfile_result.out differ
diff --git
a/regression-test/suites/export_p0/outfile/test_outfile_result.groovy
b/regression-test/suites/export_p0/outfile/test_outfile_result.groovy
new file mode 100644
index 00000000000..a4cef1c6ada
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/test_outfile_result.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_result", "p0") {
+ def export_table_name = "test_outfile_result"
+
+ sql """ DROP TABLE IF EXISTS ${export_table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${export_table_name} (
+ `user_id` LARGEINT NOT NULL COMMENT "用户id",
+ `Name` STRING COMMENT "用户年龄",
+ `Age` int(11) NULL
+ )
+ DISTRIBUTED BY HASH(user_id) BUCKETS 3
+ PROPERTIES("replication_num" = "1");
+ """
+ StringBuilder sb = new StringBuilder()
+ int i = 1
+ for (; i < 10; i ++) {
+ sb.append("""
+ (${i}, 'ftw-${i}', ${i + 18}),
+ """)
+ }
+ sb.append("""
+ (${i}, NULL, NULL)
+ """)
+ sql """ INSERT INTO ${export_table_name} VALUES
+ ${sb.toString()}
+ """
+ qt_select_export """ SELECT * FROM ${export_table_name} t ORDER BY
user_id; """
+
+ def isNumber = {String str ->
+ logger.info("str = " + str)
+ if (str == null || str.trim().isEmpty()) {
+ throw exception("result is null or empty")
+ }
+ try {
+ double num = Double.parseDouble(str);
+ if (num < 0) {
+ throw exception("result can not be less than 0")
+ }
+ } catch (NumberFormatException e) {
+ throw exception("NumberFormatException: " + e.getMessage())
+ }
+ return true
+ }
+
+
+ // 1. test s3
+ try {
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ // http schema
+ def outFilePath = "${bucket}/outfile_different_s3/exp_"
+ List<List<Object>> outfile_res = sql """ SELECT * FROM
${export_table_name} t ORDER BY user_id
+ INTO OUTFILE
"s3://${outFilePath}"
+ FORMAT AS parquet
+ PROPERTIES (
+ "s3.endpoint" =
"${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+
+ assertEquals(6, outfile_res[0].size())
+ assertEquals(true, isNumber(outfile_res[0][4]))
+ assertEquals(true, isNumber(outfile_res[0][5]))
+ } finally {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]