This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 780900a [Feature] Support preceding filter original data when loading
(#5338)
780900a is described below
commit 780900ac9ca7ef7ee91595557ca4a80c1e36b206
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Feb 7 22:37:48 2021 +0800
[Feature] Support preceding filter original data when loading (#5338)
Support conditional filtering of original data in broker load and routine
load
eg:
```
LOAD LABEL `label1`
(
DATA INFILE ('bos://cmy-repo/1.csv')
INTO TABLE tbl2
COLUMNS TERMINATED BY '\t'
(event_day, product_id, ocpc_stage, user_id)
SET (
ocpc_stage = ocpc_stage + 100
)
PRECEDING FILTER user_id = 1381035
WHERE ocpc_stage > 30
)
...
```
---
be/src/exec/base_scanner.cpp | 13 ++-
be/src/exec/base_scanner.h | 5 +-
be/src/exec/broker_scan_node.cpp | 126 +++++----------------
be/src/exec/broker_scan_node.h | 18 +--
be/src/exec/broker_scanner.cpp | 5 +-
be/src/exec/broker_scanner.h | 7 +-
be/src/exec/json_scanner.cpp | 3 +-
be/src/exec/json_scanner.h | 4 +-
be/src/exec/orc_scanner.cpp | 3 +-
be/src/exec/orc_scanner.h | 4 +-
be/src/exec/parquet_scanner.cpp | 3 +-
be/src/exec/parquet_scanner.h | 5 +-
be/src/olap/push_handler.cpp | 2 +-
be/src/olap/push_handler.h | 2 +
be/test/exec/broker_scanner_test.cpp | 19 ++--
be/test/exec/orc_scanner_test.cpp | 7 +-
.../load-data/broker-load-manual.md | 5 +
.../Data Manipulation/BROKER LOAD.md | 19 ++++
.../Data Manipulation/ROUTINE LOAD.md | 28 ++++-
.../load-data/broker-load-manual.md | 5 +
.../Data Manipulation/BROKER LOAD.md | 19 ++++
.../Data Manipulation/ROUTINE LOAD.md | 29 ++++-
fe/fe-core/src/main/cup/sql_parser.cup | 46 ++++++--
.../doris/analysis/CreateRoutineLoadStmt.java | 17 ++-
.../org/apache/doris/analysis/DataDescription.java | 25 ++--
.../org/apache/doris/analysis/ImportWhereStmt.java | 9 +-
.../java/org/apache/doris/backup/RestoreJob.java | 4 +-
.../org/apache/doris/load/BrokerFileGroup.java | 15 ++-
.../src/main/java/org/apache/doris/load/Load.java | 25 ++--
.../org/apache/doris/load/RoutineLoadDesc.java | 14 ++-
.../doris/load/loadv2/LoadingTaskPlanner.java | 10 +-
.../doris/load/routineload/RoutineLoadJob.java | 11 ++
.../org/apache/doris/planner/BrokerScanNode.java | 17 +--
.../org/apache/doris/planner/LoadScanNode.java | 40 +++++--
.../java/org/apache/doris/planner/PlanNode.java | 27 ++++-
.../apache/doris/planner/StreamLoadScanNode.java | 9 +-
.../java/org/apache/doris/qe/MultiLoadMgr.java | 2 +-
.../java/org/apache/doris/task/LoadTaskInfo.java | 2 +
.../java/org/apache/doris/task/StreamLoadTask.java | 11 +-
fe/fe-core/src/main/jflex/sql_scanner.flex | 1 +
.../apache/doris/analysis/DataDescriptionTest.java | 14 +--
.../load/routineload/KafkaRoutineLoadJobTest.java | 12 +-
gensrc/thrift/PlanNodes.thrift | 1 +
43 files changed, 411 insertions(+), 232 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 8ec57d4..4c4dab2 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -18,6 +18,7 @@
#include "base_scanner.h"
#include "common/logging.h"
+#include "exec/exec_node.h"
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
@@ -27,7 +28,9 @@
namespace doris {
BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
- const TBrokerScanRangeParams& params, ScannerCounter*
counter)
+ const TBrokerScanRangeParams& params,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
+ ScannerCounter* counter)
: _state(state),
_params(params),
_counter(counter),
@@ -41,6 +44,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile*
profile,
#endif
_mem_pool(_mem_tracker.get()),
_dest_tuple_desc(nullptr),
+ _pre_filter_ctxs(pre_filter_ctxs),
_strict_mode(false),
_profile(profile),
_rows_read_counter(nullptr),
@@ -137,6 +141,13 @@ Status BaseScanner::init_expr_ctxes() {
}
bool BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
+ // filter src tuple by preceding filter first
+ if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0],
_pre_filter_ctxs.size(), _src_tuple_row)) {
+ _counter->num_rows_unselected++;
+ return false;
+ }
+
+ // convert and fill dest tuple
int ctx_idx = 0;
for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 1fc2949..c88ff76 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -43,7 +43,7 @@ struct ScannerCounter {
class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const
TBrokerScanRangeParams& params,
- ScannerCounter* counter);
+ const std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter);
virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); };
virtual Status init_expr_ctxes();
@@ -84,6 +84,9 @@ protected:
// if there is not key of dest slot id in
dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
+ // to filter src tuple directly
+ const std::vector<ExprContext*>& _pre_filter_ctxs;
+
bool _strict_mode;
// Profile
RuntimeProfile* _profile;
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 233c6c6..a50f2eb 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -45,27 +45,15 @@ BrokerScanNode::BrokerScanNode(ObjectPool* pool, const
TPlanNode& tnode, const D
BrokerScanNode::~BrokerScanNode() {}
-// We use the PartitionRange to compare here. It should not be a member
function of PartitionInfo
-// class because there are some other member in it.
-static bool compare_part_use_range(const PartitionInfo* v1, const
PartitionInfo* v2) {
- return v1->range() < v2->range();
-}
-
Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::init(tnode));
auto& broker_scan_node = tnode.broker_scan_node;
- if (broker_scan_node.__isset.partition_exprs) {
- // ASSERT broker_scan_node.__isset.partition_infos == true
- RETURN_IF_ERROR(Expr::create_expr_trees(_pool,
broker_scan_node.partition_exprs,
- &_partition_expr_ctxs));
- for (auto& t_partition_info : broker_scan_node.partition_infos) {
- PartitionInfo* info = _pool->add(new PartitionInfo());
- RETURN_IF_ERROR(PartitionInfo::from_thrift(_pool,
t_partition_info, info));
- _partition_infos.emplace_back(info);
- }
- // partitions should be in ascending order
- std::sort(_partition_infos.begin(), _partition_infos.end(),
compare_part_use_range);
+
+ if (broker_scan_node.__isset.pre_filter_exprs) {
+ RETURN_IF_ERROR(Expr::create_expr_trees(_pool,
broker_scan_node.pre_filter_exprs,
+ &_pre_filter_ctxs));
}
+
return Status::OK();
}
@@ -91,12 +79,8 @@ Status BrokerScanNode::prepare(RuntimeState* state) {
}
}
- // prepare partition
- if (_partition_expr_ctxs.size() > 0) {
- RETURN_IF_ERROR(Expr::prepare(_partition_expr_ctxs, state, row_desc(),
expr_mem_tracker()));
- for (auto iter : _partition_infos) {
- RETURN_IF_ERROR(iter->prepare(state, row_desc(),
expr_mem_tracker()));
- }
+ if (_pre_filter_ctxs.size() > 0) {
+ RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, state, row_desc(),
expr_mem_tracker()));
}
// Profile
@@ -111,12 +95,8 @@ Status BrokerScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
- // Open partition
- if (_partition_expr_ctxs.size() > 0) {
- RETURN_IF_ERROR(Expr::open(_partition_expr_ctxs, state));
- for (auto iter : _partition_infos) {
- RETURN_IF_ERROR(iter->open(state));
- }
+ if (_pre_filter_ctxs.size() > 0) {
+ RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, state));
}
RETURN_IF_ERROR(start_scanners());
@@ -227,12 +207,8 @@ Status BrokerScanNode::close(RuntimeState* state) {
_scanner_threads[i].join();
}
- // Close partition
- if (_partition_expr_ctxs.size() > 0) {
- Expr::close(_partition_expr_ctxs, state);
- for (auto iter : _partition_infos) {
- iter->close(state);
- }
+ if (_pre_filter_ctxs.size() > 0) {
+ Expr::close(_pre_filter_ctxs, state);
}
// Close
@@ -244,17 +220,6 @@ Status BrokerScanNode::close(RuntimeState* state) {
// This function is called after plan node has been prepared.
Status BrokerScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
_scan_ranges = scan_ranges;
-
- // Now we initialize partition information
- if (_partition_expr_ctxs.size() > 0) {
- for (auto& range : _scan_ranges) {
- auto& params = range.scan_range.broker_scan_range.params;
- if (params.__isset.partition_ids) {
- std::sort(params.partition_ids.begin(),
params.partition_ids.end());
- }
- }
- }
-
return Status::OK();
}
@@ -263,35 +228,40 @@ void BrokerScanNode::debug_string(int ident_level,
std::stringstream* out) const
}
std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const
TBrokerScanRange& scan_range,
+ const
std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter*
counter) {
BaseScanner* scan = nullptr;
switch (scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scan = new ParquetScanner(_runtime_state, runtime_profile(),
scan_range.params,
- scan_range.ranges,
scan_range.broker_addresses, counter);
+ scan_range.ranges,
scan_range.broker_addresses,
+ pre_filter_ctxs, counter);
break;
case TFileFormatType::FORMAT_ORC:
scan = new ORCScanner(_runtime_state, runtime_profile(),
scan_range.params,
- scan_range.ranges, scan_range.broker_addresses,
counter);
+ scan_range.ranges, scan_range.broker_addresses,
+ pre_filter_ctxs, counter);
break;
case TFileFormatType::FORMAT_JSON:
scan = new JsonScanner(_runtime_state, runtime_profile(),
scan_range.params,
- scan_range.ranges, scan_range.broker_addresses,
counter);
+ scan_range.ranges, scan_range.broker_addresses,
+ pre_filter_ctxs, counter);
break;
default:
scan = new BrokerScanner(_runtime_state, runtime_profile(),
scan_range.params,
- scan_range.ranges,
scan_range.broker_addresses, counter);
+ scan_range.ranges,
scan_range.broker_addresses,
+ pre_filter_ctxs, counter);
}
std::unique_ptr<BaseScanner> scanner(scan);
return scanner;
}
Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
+ const std::vector<ExprContext*>&
pre_filter_ctxs,
const std::vector<ExprContext*>&
conjunct_ctxs,
- const std::vector<ExprContext*>&
partition_expr_ctxs,
ScannerCounter* counter) {
//create scanner object and open
- std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
+ std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range,
pre_filter_ctxs, counter);
RETURN_IF_ERROR(scanner->open());
bool scanner_eof = false;
@@ -387,20 +357,17 @@ void BrokerScanNode::scanner_worker(int start_idx, int
length) {
if (!status.ok()) {
LOG(WARNING) << "Clone conjuncts failed.";
}
- std::vector<ExprContext*> partition_expr_ctxs;
- ;
+
+ std::vector<ExprContext*> pre_filter_ctxs;
if (status.ok()) {
- status = Expr::clone_if_not_exists(_partition_expr_ctxs,
_runtime_state,
- &partition_expr_ctxs);
- if (!status.ok()) {
- LOG(WARNING) << "Clone conjuncts failed.";
- }
+ status = Expr::clone_if_not_exists(_pre_filter_ctxs, _runtime_state,
&pre_filter_ctxs);
}
+
ScannerCounter counter;
for (int i = 0; i < length && status.ok(); ++i) {
const TBrokerScanRange& scan_range =
_scan_ranges[start_idx + i].scan_range.broker_scan_range;
- status = scanner_scan(scan_range, scanner_expr_ctxs,
partition_expr_ctxs, &counter);
+ status = scanner_scan(scan_range, pre_filter_ctxs, scanner_expr_ctxs,
&counter);
if (!status.ok()) {
LOG(WARNING) << "Scanner[" << start_idx + i
<< "] process failed. status=" <<
status.get_error_msg();
@@ -426,45 +393,6 @@ void BrokerScanNode::scanner_worker(int start_idx, int
length) {
_queue_writer_cond.notify_all();
}
Expr::close(scanner_expr_ctxs, _runtime_state);
- Expr::close(partition_expr_ctxs, _runtime_state);
-}
-
-int64_t BrokerScanNode::binary_find_partition_id(const PartRangeKey& key)
const {
- int low = 0;
- int high = _partition_infos.size() - 1;
-
- while (low <= high) {
- int mid = low + (high - low) / 2;
- int cmp = _partition_infos[mid]->range().compare_key(key);
- if (cmp == 0) {
- return _partition_infos[mid]->id();
- } else if (cmp < 0) { // current < partition[mid]
- low = mid + 1;
- } else {
- high = mid - 1;
- }
- }
-
- return -1;
-}
-
-int64_t BrokerScanNode::get_partition_id(const std::vector<ExprContext*>&
partition_expr_ctxs,
- TupleRow* row) const {
- if (_partition_infos.size() == 0) {
- return -1;
- }
- // construct a PartRangeKey
- PartRangeKey part_key;
- // use binary search to get the right partition.
- ExprContext* ctx = partition_expr_ctxs[0];
- void* partition_val = ctx->get_value(row);
- if (partition_val != nullptr) {
- PartRangeKey::from_value(ctx->root()->type().type, partition_val,
&part_key);
- } else {
- part_key = PartRangeKey::neg_infinite();
- }
-
- return binary_find_partition_id(part_key);
}
} // namespace doris
diff --git a/be/src/exec/broker_scan_node.h b/be/src/exec/broker_scan_node.h
index dedd358..6c4f40e 100644
--- a/be/src/exec/broker_scan_node.h
+++ b/be/src/exec/broker_scan_node.h
@@ -60,12 +60,6 @@ public:
// No use
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) override;
- // Called by broker scanners to get_partition_id
- // If there is no partition information, return -1
- // Return partition id if we find the partition match this row,
- // return -1, if there is no such partition.
- int64_t get_partition_id(const std::vector<ExprContext*>& partition_exprs,
TupleRow* row) const;
-
protected:
// Write debug string of this into out.
virtual void debug_string(int indentation_level, std::stringstream* out)
const override;
@@ -89,14 +83,12 @@ private:
// Scan one range
Status scanner_scan(const TBrokerScanRange& scan_range,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
const std::vector<ExprContext*>& conjunct_ctxs,
- const std::vector<ExprContext*>& partition_expr_ctxs,
ScannerCounter* counter);
- // Find partition id with PartRangeKey
- int64_t binary_find_partition_id(const PartRangeKey& key) const;
-
std::unique_ptr<BaseScanner> create_scanner(const TBrokerScanRange&
scan_range,
+ const
std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter);
private:
@@ -123,12 +115,8 @@ private:
int _max_buffered_batches;
- // Partition information
- std::vector<ExprContext*> _partition_expr_ctxs;
- std::vector<PartitionInfo*> _partition_infos;
+ std::vector<ExprContext*> _pre_filter_ctxs;
- // Profile information
- //
RuntimeProfile::Counter* _wait_scanner_timer;
};
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 5b67f58..c268275 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -22,6 +22,7 @@
#include "exec/broker_reader.h"
#include "exec/decompressor.h"
+#include "exec/exec_node.h"
#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
#include "exec/text_converter.h"
@@ -42,8 +43,9 @@ BrokerScanner::BrokerScanner(RuntimeState* state,
RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>&
broker_addresses,
+ const
std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter)
- : BaseScanner(state, profile, params, counter),
+ : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
// _splittable(params.splittable),
@@ -381,6 +383,7 @@ bool BrokerScanner::convert_one_row(const Slice& line,
Tuple* tuple, MemPool* tu
if (!line_to_src_tuple(line)) {
return false;
}
+
return fill_dest_tuple(tuple, tuple_pool);
}
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 0c02baf..2a2563e 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -54,7 +54,9 @@ class BrokerScanner : public BaseScanner {
public:
BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params, const
std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
ScannerCounter* counter);
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<ExprContext*>&
pre_filter_ctxs,
+ ScannerCounter* counter);
~BrokerScanner();
// Open this scanner, will initialize information need to
@@ -87,13 +89,10 @@ private:
// output is tuple
bool convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool);
- //Status init_expr_ctxes();
-
Status line_to_src_tuple();
bool line_to_src_tuple(const Slice& line);
private:
- ;
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 3431be4..faba55d 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -36,8 +36,9 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile*
profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter)
- : BaseScanner(state, profile, params, counter),
+ : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
_cur_file_reader(nullptr),
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index 86bc2dd..c61cbbe 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -54,7 +54,9 @@ class JsonScanner : public BaseScanner {
public:
JsonScanner(RuntimeState* state, RuntimeProfile* profile, const
TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
ScannerCounter* counter);
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
+ ScannerCounter* counter);
~JsonScanner();
// Open this scanner, will initialize information needed
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index bdd7eb4..c8d1d7c 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -114,8 +114,9 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile*
profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter)
- : BaseScanner(state, profile, params, counter),
+ : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
// _splittable(params.splittable),
diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h
index e1eb21d..c06dc0b 100644
--- a/be/src/exec/orc_scanner.h
+++ b/be/src/exec/orc_scanner.h
@@ -29,7 +29,9 @@ class ORCScanner : public BaseScanner {
public:
ORCScanner(RuntimeState* state, RuntimeProfile* profile, const
TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
ScannerCounter* counter);
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
+ ScannerCounter* counter);
~ORCScanner() override;
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 119e162..a3a6656 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -38,8 +38,9 @@ ParquetScanner::ParquetScanner(RuntimeState* state,
RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>&
broker_addresses,
+ const std::vector<ExprContext*>&
pre_filter_ctxs,
ScannerCounter* counter)
- : BaseScanner(state, profile, params, counter),
+ : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
// _splittable(params.splittable),
diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h
index 3e7bdff..a23b91f 100644
--- a/be/src/exec/parquet_scanner.h
+++ b/be/src/exec/parquet_scanner.h
@@ -51,7 +51,10 @@ public:
ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
ScannerCounter* counter);
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<ExprContext*>& pre_filter_ctxs,
+ ScannerCounter* counter);
+
~ParquetScanner();
// Open this scanner, will initialize information need to
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index fe54ef4..a740176 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -937,7 +937,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
const TBrokerScanRange&
case TFileFormatType::FORMAT_PARQUET:
scanner = new ParquetScanner(_runtime_state.get(), _runtime_profile,
t_scan_range.params,
t_scan_range.ranges,
t_scan_range.broker_addresses,
- _counter.get());
+ _pre_filter_ctxs, _counter.get());
break;
default:
LOG(WARNING) << "Unsupported file format type: " <<
t_scan_range.ranges[0].format_type;
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index dcac40e..7f8d0aa 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -211,6 +211,8 @@ private:
std::unique_ptr<MemPool> _mem_pool;
std::unique_ptr<ScannerCounter> _counter;
std::unique_ptr<BaseScanner> _scanner;
+ // Not used, just for placeholding
+ std::vector<ExprContext*> _pre_filter_ctxs;
};
} // namespace doris
diff --git a/be/test/exec/broker_scanner_test.cpp
b/be/test/exec/broker_scanner_test.cpp
index f373f9f..98f163b 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -68,6 +68,7 @@ private:
DescriptorTbl* _desc_tbl;
std::vector<TNetworkAddress> _addresses;
ScannerCounter _counter;
+ std::vector<doris::ExprContext*> _pre_filter;
};
void BrokerScannerTest::init_desc_table() {
@@ -357,7 +358,7 @@ TEST_F(BrokerScannerTest, normal) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -409,7 +410,7 @@ TEST_F(BrokerScannerTest, normal2) {
range.size = 4;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -455,7 +456,7 @@ TEST_F(BrokerScannerTest, normal3) {
range.size = 5;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -502,7 +503,7 @@ TEST_F(BrokerScannerTest, normal4) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -533,7 +534,7 @@ TEST_F(BrokerScannerTest, normal5) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -557,7 +558,7 @@ TEST_F(BrokerScannerTest, normal6) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -588,7 +589,7 @@ TEST_F(BrokerScannerTest, normal7) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -612,7 +613,7 @@ TEST_F(BrokerScannerTest, normal8) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
@@ -643,7 +644,7 @@ TEST_F(BrokerScannerTest, normal9) {
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
- BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, &_counter);
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges,
_addresses, _pre_filter, &_counter);
auto st = scanner.open();
ASSERT_TRUE(st.ok());
diff --git a/be/test/exec/orc_scanner_test.cpp
b/be/test/exec/orc_scanner_test.cpp
index 6e70e77..dbfcc18 100644
--- a/be/test/exec/orc_scanner_test.cpp
+++ b/be/test/exec/orc_scanner_test.cpp
@@ -66,6 +66,7 @@ private:
DescriptorTbl* _desc_tbl;
std::vector<TNetworkAddress> _addresses;
ScannerCounter _counter;
+ std::vector<doris::ExprContext*> _pre_filter;
};
TEST_F(OrcScannerTest, normal) {
@@ -405,7 +406,7 @@ TEST_F(OrcScannerTest, normal) {
rangeDesc.file_type = TFileType::FILE_LOCAL;
ranges.push_back(rangeDesc);
- ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses,
&_counter);
+ ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses,
_pre_filter, &_counter);
ASSERT_TRUE(scanner.open().ok());
auto tracker = std::make_shared<MemTracker>();
@@ -528,7 +529,7 @@ TEST_F(OrcScannerTest, normal2) {
rangeDesc.file_type = TFileType::FILE_LOCAL;
ranges.push_back(rangeDesc);
- ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses,
&_counter);
+ ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses,
_pre_filter, &_counter);
ASSERT_TRUE(scanner.open().ok());
auto tracker = std::make_shared<MemTracker>();
@@ -877,7 +878,7 @@ TEST_F(OrcScannerTest, normal3) {
rangeDesc.file_type = TFileType::FILE_LOCAL;
ranges.push_back(rangeDesc);
- ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses,
&_counter);
+ ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses,
_pre_filter, &_counter);
ASSERT_TRUE(scanner.open().ok());
auto tracker = std::make_shared<MemTracker>();
diff --git a/docs/en/administrator-guide/load-data/broker-load-manual.md
b/docs/en/administrator-guide/load-data/broker-load-manual.md
index f6dd7e1..29b9fb3 100644
--- a/docs/en/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/en/administrator-guide/load-data/broker-load-manual.md
@@ -103,6 +103,7 @@ WITH BROKER broker_name broker_properties
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
+ [PRECEDING FILTER predicate]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
@@ -170,6 +171,10 @@ The following is a detailed explanation of some parameters
of the data descripti
In `data_desc`, you can specify the partition information of the table
to be imported, but it will not be imported if the data to be imported does not
belong to the specified partition. At the same time, data that does not specify
a Partition is considered error data.
++ preceding filter predicate
+
+ Used to filter original data. The original data is the data without column
mapping and transformation. The user can filter the data before conversion,
select the desired data, and then perform the conversion.
+
+ where predicate
The where statement in ```data_desc``` is responsible for filtering
the data that has been transformed. The unselected rows which is filtered by
where predicate will not be calculated in ```max_filter_ratio``` . If there are
more than one where predicate of the same table , the multi where predicate
will be merged from different ```data_desc``` and the policy is AND.
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index cc4b3af..ba98120 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
@@ -68,6 +68,7 @@ under the License.
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
+ [PRECEDING FILTER predicate]
[SET (k1 = func(k2))]
[WHERE predicate]
[DELETE ON label=true]
@@ -106,6 +107,10 @@ under the License.
syntax:
(col_name1, col_name2, ...)
+
+ PRECEDING FILTER predicate:
+
+ Used to filter original data. The original data is the data
without column mapping and transformation. The user can filter the data before
conversion, select the desired data, and then perform the conversion.
SET:
@@ -454,6 +459,20 @@ under the License.
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);
+
+ 14. Filter the original data first, and perform column mapping, conversion
and filtering operations
+
+ LOAD LABEL example_db.label_filter
+ (
+ DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+ INTO TABLE `tbl1`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,v1,v2)
+ PRECEDING FILTER k1 > 2
+ SET (k1 = k1 +1)
+ WHERE k1 > 3
+ )
+ with BROKER "hdfs" ("username"="user", "password"="pass");
## keyword
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
index 60d9b22..1d8bb4d 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
@@ -61,7 +61,8 @@ FROM data_source
[columns_mapping],
[where_predicates],
[delete_on_predicates]
- [partitions]
+ [partitions],
+ [preceding_predicates]
```
1. column_separator:
@@ -116,6 +117,10 @@ FROM data_source
Only used when merge type is MERGE
+ 6. preceding_predicates
+
+ Used to filter original data. The original data is the data without
column mapping and transformation. The user can filter the data before
conversion, select the desired data, and then perform the conversion.
+
5. job_properties
A generic parameter that specifies a routine load job.
@@ -527,6 +532,27 @@ FROM data_source
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
+
+ 8. Filter original data
+
+ CREATE ROUTINE LOAD example_db.test_job ON example_tbl
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(k1,k2,source_sequence,v1,v2),
+ PRECEDING FILTER k1 > 2
+ PROPERTIES
+ (
+ "desired_concurrent_number"="3",
+ "max_batch_interval" = "30",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ ) FROM KAFKA
+ (
+ "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+ "kafka_topic" = "my_topic",
+ "kafka_partitions" = "0,1,2,3",
+ "kafka_offsets" = "101,0,0,200"
+ );
+
## keyword
CREATE, ROUTINE, LOAD
diff --git a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
index 33969fd..2ba0b63 100644
--- a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
@@ -103,6 +103,7 @@ WITH BROKER broker_name broker_properties
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
+ [PRECEDING FILTER predicate]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
@@ -174,6 +175,10 @@ Label 的另一个作用,是防止用户重复导入相同的数据。**强烈
在 ```data_desc``` 中的 SET
语句负责设置列函数变换,这里的列函数变换支持所有查询的等值表达式变换。如果原始数据的列和表中的列不一一对应,就需要用到这个属性。
++ preceding filter predicate
+
+ 用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
+
+ where predicate
在 ```data_desc``` 中的 WHERE 语句中负责过滤已经完成 transform 的数据,被 filter
的数据不会进入容忍率的统计中。如果多个 data_desc 中声明了同一张表的多个条件的话,则会 merge 同一张表的多个条件,merge 策略是 AND 。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER
LOAD.md
index e785877..57f1bff 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
@@ -68,6 +68,7 @@ under the License.
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
+ [PRECEDING FILTER predicate]
[SET (k1 = func(k2))]
[WHERE predicate]
[DELETE ON label=true]
@@ -103,6 +104,10 @@ under the License.
当需要跳过导入文件中的某一列时,将该列指定为 table 中不存在的列名即可。
语法:
(col_name1, col_name2, ...)
+
+ PRECEDING FILTER predicate:
+
+ 用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
SET:
@@ -478,6 +483,20 @@ under the License.
ORDER BY source_sequence
)
with BROKER "hdfs" ("username"="user", "password"="pass");
+
+ 14. 先过滤原始数据,在进行列的映射、转换和过滤操作
+
+ LOAD LABEL example_db.label_filter
+ (
+ DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+ INTO TABLE `tbl1`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,v1,v2)
+ PRECEDING FILTER k1 > 2
+ SET (k1 = k1 +1)
+ WHERE k1 > 3
+ )
+ with BROKER "hdfs" ("username"="user", "password"="pass");
## keyword
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE
LOAD.md
index 648113e..f984bfe 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
@@ -58,7 +58,8 @@ under the License.
[where_predicates],
[delete_on_predicates],
[source_sequence],
- [partitions]
+ [partitions],
+ [preceding_predicates]
1. column_separator:
@@ -110,6 +111,12 @@ under the License.
只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE,
source_sequence可以是数据源中的列,也可以是表结构中的一列。
+ 7. preceding_predicates
+
+ PRECEDING FILTER predicate
+
+ 用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
+
5. job_properties
用于指定例行导入作业的通用参数。
@@ -481,6 +488,26 @@ under the License.
"kafka_offsets" = "101,0,0,200"
);
+ 8. 过滤原始数据
+
+ CREATE ROUTINE LOAD example_db.test_job ON example_tbl
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(k1,k2,source_sequence,v1,v2),
+ PRECEDING FILTER k1 > 2
+ PROPERTIES
+ (
+ "desired_concurrent_number"="3",
+ "max_batch_interval" = "30",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ ) FROM KAFKA
+ (
+ "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+ "kafka_topic" = "my_topic",
+ "kafka_partitions" = "0,1,2,3",
+ "kafka_offsets" = "101,0,0,200"
+ );
+
## keyword
CREATE,ROUTINE,LOAD
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 1f43b43..41410d1 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -237,7 +237,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE,
KW_ALL, KW_ALTER, KW_A
KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED,
KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP,
KW_DUPLICATE,
KW_ELSE, KW_ENABLE, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS,
KW_EVENTS, KW_EXCEPT, KW_EXCLUDE,
KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT,
- KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM,
KW_FILE, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, KW_FRONTEND,
KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
+ KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM,
KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT,
KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP, KW_GROUPING,
KW_HASH, KW_HAVING, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HOUR, KW_HUB,
KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, KW_INSTALL,
@@ -289,7 +289,8 @@ nonterminal StatementBase stmt, show_stmt, show_param,
help_stmt, load_stmt,
describe_stmt, alter_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt,
create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt,
admin_stmt, truncate_stmt,
- import_columns_stmt, import_delete_on_stmt, import_sequence_stmt,
import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt;
+ import_columns_stmt, import_delete_on_stmt, import_sequence_stmt,
import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
+ import_preceding_filter_stmt;
nonterminal ImportColumnDesc import_column_desc;
nonterminal List<ImportColumnDesc> import_column_descs;
@@ -345,6 +346,7 @@ nonterminal ClusterName cluster_name;
nonterminal ClusterName des_cluster_name;
nonterminal TableName table_name;
nonterminal FunctionName function_name;
+nonterminal Expr pre_filter_clause;
nonterminal Expr where_clause;
nonterminal Expr delete_on_clause;
nonterminal Expr where_clause_without_null;
@@ -545,6 +547,10 @@ stmts ::=
{:
RESULT = Lists.newArrayList(stmt);
:}
+ | import_preceding_filter_stmt:stmt
+ {:
+ RESULT = Lists.newArrayList(stmt);
+ :}
;
import_columns_stmt ::=
@@ -580,7 +586,7 @@ import_column_desc ::=
import_where_stmt ::=
KW_WHERE expr:expr
{:
- RESULT = new ImportWhereStmt(expr);
+ RESULT = new ImportWhereStmt(expr, false);
:}
;
@@ -598,6 +604,13 @@ import_sequence_stmt ::=
:}
;
+import_preceding_filter_stmt ::=
+ KW_PRECEDING KW_FILTER expr:expr
+ {:
+ RESULT = new ImportWhereStmt(expr, true);
+ :}
+ ;
+
stmt ::=
alter_stmt:stmt
{: RESULT = stmt; :}
@@ -1378,12 +1391,13 @@ data_desc ::=
opt_col_list:colList
opt_columns_from_path:columnsFromPath
opt_col_mapping_list:colMappingList
+ pre_filter_clause:preFilterExpr
where_clause:whereExpr
delete_on_clause:deleteExpr
sequence_col_clause:sequenceColName
{:
RESULT = new DataDescription(tableName, partitionNames, files,
colList, colSep, fileFormat,
- columnsFromPath, isNeg, colMappingList, whereExpr, mergeType,
deleteExpr, sequenceColName);
+ columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr,
mergeType, deleteExpr, sequenceColName);
:}
| opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
opt_negative:isNeg
@@ -1571,6 +1585,10 @@ load_property ::=
{:
RESULT = columnsInfo;
:}
+ | import_preceding_filter_stmt:preFilter
+ {:
+ RESULT = preFilter;
+ :}
| import_where_stmt:wherePredicate
{:
RESULT = wherePredicate;
@@ -3744,6 +3762,7 @@ where_clause ::=
| KW_WHERE expr:e
{: RESULT = e; :}
;
+
delete_on_clause ::=
/* empty */
{: RESULT = null; :}
@@ -3752,11 +3771,18 @@ delete_on_clause ::=
;
sequence_col_clause ::=
-/* empty */
-{: RESULT = null; :}
-| KW_ORDER KW_BY ident:s
-{: RESULT = s; :}
-;
+ /* empty */
+ {: RESULT = null; :}
+ | KW_ORDER KW_BY ident:s
+ {: RESULT = s; :}
+ ;
+
+pre_filter_clause ::=
+ /* empty */
+ {: RESULT = null; :}
+ | KW_PRECEDING KW_FILTER expr:e
+ {: RESULT = e; :}
+ ;
where_clause_without_null ::=
KW_WHERE expr:e
@@ -4684,6 +4710,8 @@ keyword ::=
{: RESULT = id; :}
| KW_FILE:id
{: RESULT = id; :}
+ | KW_FILTER:id
+ {: RESULT = id; :}
| KW_FIRST:id
{: RESULT = id; :}
| KW_FORMAT:id
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index ee629be..1960c28 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -347,6 +347,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public void checkLoadProperties() throws UserException {
ColumnSeparator columnSeparator = null;
ImportColumnsStmt importColumnsStmt = null;
+ ImportWhereStmt precedingImportWhereStmt = null;
ImportWhereStmt importWhereStmt = null;
ImportSequenceStmt importSequenceStmt = null;
PartitionNames partitionNames = null;
@@ -368,10 +369,18 @@ public class CreateRoutineLoadStmt extends DdlStmt {
importColumnsStmt = (ImportColumnsStmt) parseNode;
} else if (parseNode instanceof ImportWhereStmt) {
// check where expr
- if (importWhereStmt != null) {
- throw new AnalysisException("repeat setting of where
predicate");
+ ImportWhereStmt node = (ImportWhereStmt) parseNode;
+ if (node.isPreceding()) {
+ if (precedingImportWhereStmt != null) {
+ throw new AnalysisException("repeat setting of
preceding where predicate");
+ }
+ precedingImportWhereStmt = node;
+ } else {
+ if (importWhereStmt != null) {
+ throw new AnalysisException("repeat setting of
where predicate");
+ }
+ importWhereStmt = node;
}
- importWhereStmt = (ImportWhereStmt) parseNode;
} else if (parseNode instanceof PartitionNames) {
// check partition names
if (partitionNames != null) {
@@ -394,7 +403,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
}
}
- routineLoadDesc = new RoutineLoadDesc(columnSeparator,
importColumnsStmt, importWhereStmt,
+ routineLoadDesc = new RoutineLoadDesc(columnSeparator,
importColumnsStmt, precedingImportWhereStmt, importWhereStmt,
partitionNames, importDeleteOnStmt == null ? null :
importDeleteOnStmt.getExpr(), mergeType,
importSequenceStmt == null ? null :
importSequenceStmt.getSequenceColName());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index ccb2217..7e31c2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -34,16 +34,10 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TNetworkAddress;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.StringReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -52,6 +46,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.io.StringReader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
// used to describe data info which is needed to import.
//
// data_desc:
@@ -102,6 +103,7 @@ public class DataDescription {
private final List<String> columnsFromPath;
// save column mapping in SET(xxx = xxx) clause
private final List<Expr> columnMappingList;
+ private final Expr precedingFilterExpr;
private final Expr whereExpr;
private final String srcTableName;
// this only used in multi load, all filePaths is file not dir
@@ -144,7 +146,7 @@ public class DataDescription {
boolean isNegative,
List<Expr> columnMappingList) {
this(tableName, partitionNames, filePaths, columns, columnSeparator,
fileFormat, null,
- isNegative, columnMappingList, null,
LoadTask.MergeType.APPEND, null, null);
+ isNegative, columnMappingList, null, null,
LoadTask.MergeType.APPEND, null, null);
}
public DataDescription(String tableName,
@@ -156,6 +158,7 @@ public class DataDescription {
List<String> columnsFromPath,
boolean isNegative,
List<Expr> columnMappingList,
+ Expr fileFilterExpr,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition,
@@ -169,6 +172,7 @@ public class DataDescription {
this.columnsFromPath = columnsFromPath;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
+ this.precedingFilterExpr = fileFilterExpr;
this.whereExpr = whereExpr;
this.srcTableName = null;
this.mergeType = mergeType;
@@ -194,6 +198,7 @@ public class DataDescription {
this.columnsFromPath = null;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
+ this.precedingFilterExpr = null; // external hive table does not
support file filter expr
this.whereExpr = whereExpr;
this.srcTableName = srcTableName;
this.mergeType = mergeType;
@@ -381,6 +386,10 @@ public class DataDescription {
return partitionNames;
}
+ public Expr getPrecdingFilterExpr() {
+ return precedingFilterExpr;
+ }
+
public Expr getWhereExpr() {
return whereExpr;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportWhereStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportWhereStmt.java
index 9d60786..c4cfaa3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportWhereStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportWhereStmt.java
@@ -19,15 +19,22 @@ package org.apache.doris.analysis;
public class ImportWhereStmt extends StatementBase {
private Expr expr;
+ // Only used in load processs to define a "preceding filter" expr
+ private boolean isPreceding;
- public ImportWhereStmt(Expr expr) {
+ public ImportWhereStmt(Expr expr, boolean isPreceding) {
this.expr = expr;
+ this.isPreceding = isPreceding;
}
public Expr getExpr() {
return expr;
}
+ public boolean isPreceding() {
+ return isPreceding;
+ }
+
@Override
public RedirectStatus getRedirectStatus() {
return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 2c8c409..64ac57b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -539,8 +539,8 @@ public class RestoreJob extends AbstractJob {
return;
}
LOG.debug("get intersect part names: {}, job: {}",
intersectPartNames, this);
- if
(localOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames)
- !=
remoteOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION,
intersectPartNames)) {
+ if
(!localOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames)
+
.equals(remoteOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION,
intersectPartNames))) {
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ jobInfo.getAliasByOriginNameIfSet(tableName)
+ " already exist but with different
schema");
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 6bfdec4..e608887 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -42,13 +42,13 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -82,7 +82,9 @@ public class BrokerFileGroup implements Writable {
private List<ImportColumnDesc> columnExprList;
// this is only for hadoop function check
private Map<String, Pair<String, List<String>>> columnToHadoopFunction;
- // filter the data which has been conformed
+ // filter the data from source directly
+ private Expr precedingFilterExpr;
+ // filter the data which has been mapped and transformed
private Expr whereExpr;
private Expr deleteCondition;
private LoadTask.MergeType mergeType;
@@ -120,6 +122,7 @@ public class BrokerFileGroup implements Writable {
this.columnsFromPath = dataDescription.getColumnsFromPath();
this.columnExprList = dataDescription.getParsedColumnExprList();
this.columnToHadoopFunction =
dataDescription.getColumnToHadoopFunction();
+ this.precedingFilterExpr = dataDescription.getPrecdingFilterExpr();
this.whereExpr = dataDescription.getWhereExpr();
this.deleteCondition = dataDescription.getDeleteCondition();
this.mergeType = dataDescription.getMergeType();
@@ -260,6 +263,10 @@ public class BrokerFileGroup implements Writable {
return partitionIds;
}
+ public Expr getPrecedingFilterExpr() {
+ return precedingFilterExpr;
+ }
+
public Expr getWhereExpr() {
return whereExpr;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 315ba25..ef38979 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -95,11 +95,6 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.transaction.TransactionNotFoundException;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -107,6 +102,11 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -929,14 +929,19 @@ public class Load {
* 4. validate hadoop functions
* 5. init slot descs and expr map for load plan
*/
- public static void initColumns(Table tbl, List<ImportColumnDesc>
columnExprs,
- Map<String, Pair<String, List<String>>> columnToHadoopFunction,
- Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor
srcTupleDesc,
- Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams
params,
- boolean needInitSlotAndAnalyzeExprs) throws UserException {
+ private static void initColumns(Table tbl, List<ImportColumnDesc>
columnExprs,
+ Map<String, Pair<String, List<String>>>
columnToHadoopFunction,
+ Map<String, Expr> exprsByName, Analyzer
analyzer, TupleDescriptor srcTupleDesc,
+ Map<String, SlotDescriptor>
slotDescByName, TBrokerScanRangeParams params,
+ boolean needInitSlotAndAnalyzeExprs)
throws UserException {
// We make a copy of the columnExprs so that our subsequent changes
// to the columnExprs will not affect the original columnExprs.
// skip the mapping columns not exist in schema
+ // eg: the origin column list is:
+ // (k1, k2, tmpk3 = k1 + k2, k3 = tmpk3)
+ // after calling rewriteColumns(), it will become
+ // (k1, k2, tmpk3 = k1 + k2, k3 = k1 + k2)
+ // so "tmpk3 = k1 + k2" is not needed anymore, we can skip it.
List<ImportColumnDesc> copiedColumnExprs = new ArrayList<>();
for (ImportColumnDesc importColumnDesc : columnExprs) {
String mappingColumnName = importColumnDesc.getColumnName();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
index 4c08c92..bb7de5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
@@ -17,8 +17,6 @@
package org.apache.doris.load;
-import com.google.common.base.Strings;
-
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.Expr;
@@ -29,9 +27,12 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.loadv2.LoadTask;
+import com.google.common.base.Strings;
+
public class RoutineLoadDesc {
private final ColumnSeparator columnSeparator;
private final ImportColumnsStmt columnsInfo;
+ private final ImportWhereStmt precedingFilter;
private final ImportWhereStmt wherePredicate;
private final Expr deleteCondition;
private LoadTask.MergeType mergeType;
@@ -40,11 +41,12 @@ public class RoutineLoadDesc {
private final String sequenceColName;
public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt
columnsInfo,
- ImportWhereStmt wherePredicate, PartitionNames
partitionNames,
- Expr deleteCondition, LoadTask.MergeType mergeType,
+ ImportWhereStmt precedingFilter, ImportWhereStmt
wherePredicate,
+ PartitionNames partitionNames, Expr
deleteCondition, LoadTask.MergeType mergeType,
String sequenceColName) {
this.columnSeparator = columnSeparator;
this.columnsInfo = columnsInfo;
+ this.precedingFilter = precedingFilter;
this.wherePredicate = wherePredicate;
this.partitionNames = partitionNames;
this.deleteCondition = deleteCondition;
@@ -60,6 +62,10 @@ public class RoutineLoadDesc {
return columnsInfo;
}
+ public ImportWhereStmt getPrecedingFilter() {
+ return precedingFilter;
+ }
+
public ImportWhereStmt getWherePredicate() {
return wherePredicate;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 9d4a497..61f7670 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -106,10 +106,10 @@ public class LoadingTaskPlanner {
public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>>
fileStatusesList, int filesAdded)
throws UserException {
// Generate tuple descriptor
- TupleDescriptor tupleDesc = descTable.createTupleDescriptor();
+ TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
// use full schema to fill the descriptor table
for (Column col : table.getFullSchema()) {
- SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
+ SlotDescriptor slotDesc =
descTable.addSlotDescriptor(destTupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
if (col.isAllowNull()) {
@@ -121,8 +121,8 @@ public class LoadingTaskPlanner {
// Generate plan trees
// 1. Broker scan node
- BrokerScanNode scanNode = new BrokerScanNode(new
PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
- fileStatusesList,
filesAdded);
+ BrokerScanNode scanNode = new BrokerScanNode(new
PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
+ fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups,
strictMode, loadParallelism);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@@ -131,7 +131,7 @@ public class LoadingTaskPlanner {
// 2. Olap table sink
List<Long> partitionIds = getAllPartitionIds();
- OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc,
partitionIds);
+ OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc,
partitionIds);
olapTableSink.init(loadId, txnId, dbId, timeoutS);
olapTableSink.complete();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index ebd962a..de9ff6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -156,6 +156,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
// protected RoutineLoadDesc routineLoadDesc; // optional
protected PartitionNames partitions; // optional
protected List<ImportColumnDesc> columnDescs; // optional
+ protected Expr precedingFilter; // optional
protected Expr whereExpr; // optional
protected ColumnSeparator columnSeparator; // optional
protected int desireTaskConcurrentNum; // optional
@@ -352,6 +353,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
}
}
}
+ if (routineLoadDesc.getPrecedingFilter() != null) {
+ precedingFilter =
routineLoadDesc.getPrecedingFilter().getExpr();
+ }
if (routineLoadDesc.getWherePredicate() != null) {
whereExpr = routineLoadDesc.getWherePredicate().getExpr();
}
@@ -471,6 +475,12 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
return fileFormatType;
}
+ @Override
+ public Expr getPrecedingFilter() {
+ return precedingFilter;
+ }
+
+ @Override
public Expr getWhereExpr() {
return whereExpr;
}
@@ -1334,6 +1344,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put("partitions", partitions == null ? STAR_STRING :
Joiner.on(",").join(partitions.getPartitionNames()));
jobProperties.put("columnToColumnExpr", columnDescs == null ?
STAR_STRING : Joiner.on(",").join(columnDescs));
+ jobProperties.put("precedingFilter", precedingFilter == null ?
STAR_STRING : precedingFilter.toSql());
jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING :
whereExpr.toSql());
if (getFormat().equalsIgnoreCase("json")) {
jobProperties.put("dataFormat", "json");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index bb52fbb..2a47f20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -115,7 +115,7 @@ public class BrokerScanNode extends LoadScanNode {
private static class ParamCreateContext {
public BrokerFileGroup fileGroup;
public TBrokerScanRangeParams params;
- public TupleDescriptor tupleDescriptor;
+ public TupleDescriptor srcTupleDescriptor;
public Map<String, Expr> exprMap;
public Map<String, SlotDescriptor> slotDescByName;
public String timezone;
@@ -123,9 +123,9 @@ public class BrokerScanNode extends LoadScanNode {
private List<ParamCreateContext> paramCreateContexts;
- public BrokerScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String
planNodeName,
List<List<TBrokerFileStatus>> fileStatusesList, int
filesAdded) {
- super(id, desc, planNodeName);
+ super(id, destTupleDesc, planNodeName);
this.fileStatusesList = fileStatusesList;
this.filesAdded = filesAdded;
}
@@ -203,7 +203,8 @@ public class BrokerScanNode extends LoadScanNode {
deleteCondition = fileGroup.getDeleteCondition();
mergeType = fileGroup.getMergeType();
initColumns(context);
- initWhereExpr(fileGroup.getWhereExpr(), analyzer);
+ initAndSetPrecedingFilter(fileGroup.getPrecedingFilterExpr(),
context.srcTupleDescriptor, analyzer);
+ initAndSetWhereExpr(fileGroup.getWhereExpr(), this.desc, analyzer);
}
/**
@@ -217,7 +218,7 @@ public class BrokerScanNode extends LoadScanNode {
* @throws UserException
*/
private void initColumns(ParamCreateContext context) throws UserException {
- context.tupleDescriptor =
analyzer.getDescTbl().createTupleDescriptor();
+ context.srcTupleDescriptor =
analyzer.getDescTbl().createTupleDescriptor();
context.slotDescByName = Maps.newHashMap();
context.exprMap = Maps.newHashMap();
@@ -240,7 +241,7 @@ public class BrokerScanNode extends LoadScanNode {
Load.initColumns(targetTable, columnExprs,
context.fileGroup.getColumnToHadoopFunction(),
context.exprMap, analyzer,
- context.tupleDescriptor, context.slotDescByName,
context.params);
+ context.srcTupleDescriptor, context.slotDescByName,
context.params);
}
private TScanRangeLocations newLocations(TBrokerScanRangeParams params,
BrokerDesc brokerDesc)
@@ -501,7 +502,7 @@ public class BrokerScanNode extends LoadScanNode {
ParamCreateContext context = paramCreateContexts.get(i);
try {
finalizeParams(context.slotDescByName, context.exprMap,
context.params,
- context.tupleDescriptor, strictMode,
context.fileGroup.isNegative(), analyzer);
+ context.srcTupleDescriptor, strictMode,
context.fileGroup.isNegative(), analyzer);
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
@@ -538,7 +539,7 @@ public class BrokerScanNode extends LoadScanNode {
output.append(prefix).append("BROKER:
").append(brokerDesc.getName()).append("\n");
return output.toString();
}
-
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index f786cf3..2029891 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -56,38 +56,52 @@ public abstract class LoadScanNode extends ScanNode {
super(id, desc, planNodeName);
}
- protected void initWhereExpr(Expr whereExpr, Analyzer analyzer) throws
UserException {
+ protected void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor
tupleDesc, Analyzer analyzer) throws UserException {
+ Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer);
+ if (newWhereExpr != null) {
+ addConjuncts(newWhereExpr.getConjuncts());
+ }
+ }
+
+ protected void initAndSetPrecedingFilter(Expr whereExpr, TupleDescriptor
tupleDesc, Analyzer analyzer) throws UserException {
+ Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer);
+ if (newWhereExpr != null) {
+ addPreFilterConjuncts(newWhereExpr.getConjuncts());
+ }
+ }
+
+ private Expr initWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc,
Analyzer analyzer) throws UserException {
if (whereExpr == null) {
- return;
+ return null;
}
Map<String, SlotDescriptor> dstDescMap =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- for (SlotDescriptor slotDescriptor : desc.getSlots()) {
+ for (SlotDescriptor slotDescriptor : tupleDesc.getSlots()) {
dstDescMap.put(slotDescriptor.getColumn().getName(),
slotDescriptor);
}
// substitute SlotRef in filter expression
// where expr must be equal first to transfer some predicates(eg:
BetweenPredicate to BinaryPredicate)
- whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
+ Expr newWhereExpr = analyzer.getExprRewriter().rewrite(whereExpr,
analyzer);
List<SlotRef> slots = Lists.newArrayList();
- whereExpr.collect(SlotRef.class, slots);
+ newWhereExpr.collect(SlotRef.class, slots);
ExprSubstitutionMap smap = new ExprSubstitutionMap();
for (SlotRef slot : slots) {
SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName());
if (slotDesc == null) {
throw new UserException("unknown column reference in where
statement, reference="
- + slot.getColumnName());
+ + slot.getColumnName());
}
smap.getLhs().add(slot);
smap.getRhs().add(new SlotRef(slotDesc));
}
- whereExpr = whereExpr.clone(smap);
- whereExpr.analyze(analyzer);
- if (!whereExpr.getType().equals(Type.BOOLEAN)) {
+ newWhereExpr = newWhereExpr.clone(smap);
+ newWhereExpr.analyze(analyzer);
+ if (!newWhereExpr.getType().equals(Type.BOOLEAN)) {
throw new UserException("where statement is not a valid statement
return bool");
}
- addConjuncts(whereExpr.getConjuncts());
+ return newWhereExpr;
}
protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor
slotDesc, Expr expr) throws AnalysisException {
@@ -179,6 +193,12 @@ public abstract class LoadScanNode extends ScanNode {
protected void toThrift(TPlanNode planNode) {
planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
TBrokerScanNode brokerScanNode = new
TBrokerScanNode(desc.getId().asInt());
+ if (!preFilterConjuncts.isEmpty()) {
+ for (Expr e : preFilterConjuncts) {
+ brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+ }
+ }
planNode.setBrokerScanNode(brokerScanNode);
}
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 8a84c1a..45b36ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -17,7 +17,6 @@
package org.apache.doris.planner;
-import com.google.common.base.Predicates;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
@@ -31,14 +30,15 @@ import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPlan;
import org.apache.doris.thrift.TPlanNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.math.LongMath;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -82,6 +82,18 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
protected List<Expr> conjuncts = Lists.newArrayList();
+ // Conjuncts used to filter the original load file.
+ // In the load execution plan, the difference between "preFilterConjuncts"
and "conjuncts" is that
+ // conjuncts are used to filter the data after column conversion and
mapping,
+ // while fileFilterConjuncts directly filter the content read from the
source data.
+ // That is, the data processing flow is:
+ //
+ // 1. Read data from source.
+ // 2. Filter data by using "preFilterConjuncts".
+ // 3. Do column mapping and transforming.
+ // 4. Filter data by using "conjuncts".
+ protected List<Expr> preFilterConjuncts = Lists.newArrayList();
+
// Fragment that this PlanNode is executed in. Valid only after this
PlanNode has been
// assigned to a fragment. Set and maintained by enclosing PlanFragment.
protected PlanFragment fragment_;
@@ -263,6 +275,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
this.conjuncts.addAll(conjuncts);
}
+ public void addPreFilterConjuncts(List<Expr> conjuncts) {
+ if (conjuncts == null) {
+ return;
+ }
+ this.preFilterConjuncts.addAll(conjuncts);
+ }
+
public void transferConjuncts(PlanNode recipient) {
recipient.conjuncts.addAll(conjuncts);
conjuncts.clear();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index c79dcf6..24d614b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -39,12 +39,12 @@ import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TUniqueId;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
@@ -137,7 +137,8 @@ public class StreamLoadScanNode extends LoadScanNode {
exprsByName, analyzer, srcTupleDesc, slotDescByName, params);
// analyze where statement
- initWhereExpr(taskInfo.getWhereExpr(), analyzer);
+ initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(),
this.srcTupleDesc, analyzer);
+ initAndSetWhereExpr(taskInfo.getWhereExpr(), this.desc, analyzer);
computeStats(analyzer);
createDefaultSmap(analyzer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index 24fc3e4..cd42c98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -508,7 +508,7 @@ public class MultiLoadMgr {
}
}
DataDescription dataDescription = new DataDescription(tbl,
partitionNames, files, null, columnSeparator,
- fileFormat, null, isNegative, null, whereExpr, mergeType,
deleteCondition,
+ fileFormat, null, isNegative, null, null, whereExpr,
mergeType, deleteCondition,
sequenceColName);
dataDescription.setColumnDef(colString);
backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 7b1cf78..983a944 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -48,6 +48,8 @@ public interface LoadTaskInfo {
public String getPath();
public List<ImportColumnDesc> getColumnExprDescs();
public boolean isStrictMode();
+
+ public Expr getPrecedingFilter();
public Expr getWhereExpr();
public ColumnSeparator getColumnSeparator();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 93f6ee5..c79e5c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -17,9 +17,6 @@
package org.apache.doris.task;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
@@ -38,9 +35,13 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
import java.io.StringReader;
import java.util.List;
@@ -105,6 +106,10 @@ public class StreamLoadTask implements LoadTaskInfo {
return columnExprDescs;
}
+ public Expr getPrecedingFilter() {
+ return null;
+ }
+
public Expr getWhereExpr() {
return whereExpr;
}
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 8863bf1..cf67a3f 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -185,6 +185,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("false", new Integer(SqlParserSymbols.KW_FALSE));
keywordMap.put("feature", new Integer(SqlParserSymbols.KW_FEATURE));
keywordMap.put("file", new Integer(SqlParserSymbols.KW_FILE));
+ keywordMap.put("filter", new Integer(SqlParserSymbols.KW_FILTER));
keywordMap.put("first", new Integer(SqlParserSymbols.KW_FIRST));
keywordMap.put("float", new Integer(SqlParserSymbols.KW_FLOAT));
keywordMap.put("follower", new Integer(SqlParserSymbols.KW_FOLLOWER));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
index f25464d..9197c6b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
@@ -27,10 +27,10 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.MockedAuth;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.SystemInfoService;
import com.google.common.collect.Lists;
-import org.apache.doris.system.SystemInfoService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -119,7 +119,7 @@ public class DataDescriptionTest {
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new
IntLiteral(1), new IntLiteral(1));
desc = new DataDescription("testTable", null,
Lists.newArrayList("abc.txt"),
- Lists.newArrayList("col1", "col2"), new ColumnSeparator(","),
"csv", null, false, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
+ Lists.newArrayList("col1", "col2"), new ColumnSeparator(","),
"csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr,
null);
desc.analyze("testDb");
Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE
testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1",
desc.toString());
Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql());
@@ -220,7 +220,7 @@ public class DataDescriptionTest {
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new
IntLiteral(1), new IntLiteral(1));
DataDescription desc = new DataDescription("testTable", null,
Lists.newArrayList("abc.txt"),
- Lists.newArrayList("col1", "col2"), new ColumnSeparator(","),
"csv", null, true, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
+ Lists.newArrayList("col1", "col2"), new ColumnSeparator(","),
"csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr,
null);
desc.analyze("testDb");
}
@@ -311,8 +311,8 @@ public class DataDescriptionTest {
@Test
public void testAnalyzeSequenceColumnNormal() throws AnalysisException {
DataDescription desc = new DataDescription("testTable", null,
Lists.newArrayList("abc.txt"),
- Lists.newArrayList("k1", "k2", "source_sequence","v1"), new
ColumnSeparator("\t"),
- null, null,false, null, null, LoadTask.MergeType.APPEND, null,
"source_sequence");
+ Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new
ColumnSeparator("\t"),
+ null, null, false, null, null, null,
LoadTask.MergeType.APPEND, null, "source_sequence");
new Expectations() {
{
tbl.getName();
@@ -330,8 +330,8 @@ public class DataDescriptionTest {
@Test(expected = AnalysisException.class)
public void testAnalyzeSequenceColumnWithoutSourceSequence() throws
AnalysisException {
DataDescription desc = new DataDescription("testTable", null,
Lists.newArrayList("abc.txt"),
- Lists.newArrayList("k1", "k2","v1"), new ColumnSeparator("\t"),
- null, null,false, null, null, LoadTask.MergeType.APPEND, null,
"source_sequence");
+ Lists.newArrayList("k1", "k2", "v1"), new
ColumnSeparator("\t"),
+ null, null, false, null, null, null,
LoadTask.MergeType.APPEND, null, "source_sequence");
new Expectations() {
{
tbl.getName();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 70b0686..5bbab3a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -43,10 +43,6 @@ import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -54,6 +50,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -244,7 +244,7 @@ public class KafkaRoutineLoadJobTest {
public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog,
@Injectable Database
database) throws LoadException {
CreateRoutineLoadStmt createRoutineLoadStmt =
initCreateRoutineLoadStmt();
- RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator,
null, null,
+ RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator,
null, null, null,
partitionNames, null, LoadTask.MergeType.APPEND, null);
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc",
routineLoadDesc);
@@ -269,7 +269,7 @@ public class KafkaRoutineLoadJobTest {
@Injectable Database database,
@Injectable OlapTable table) throws UserException {
CreateRoutineLoadStmt createRoutineLoadStmt =
initCreateRoutineLoadStmt();
- RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator,
null, null, partitionNames, null,
+ RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator,
null, null, null, partitionNames, null,
LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName());
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc",
routineLoadDesc);
List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1f83574..58c09c8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -224,6 +224,7 @@ struct TBrokerScanNode {
// Partition info used to process partition select in broker load
2: optional list<Exprs.TExpr> partition_exprs
3: optional list<Partitions.TRangePartition> partition_infos
+ 4: optional list<Exprs.TExpr> pre_filter_exprs
}
struct TEsScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]