This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d318f1c99 IMPALA-12377: Improve count(*) performance for jdbc external
table
d318f1c99 is described below
commit d318f1c99208196abdf0d0f57490439ddb421cab
Author: wzhou-code <[email protected]>
AuthorDate: Thu Nov 2 23:02:41 2023 -0700
IMPALA-12377: Improve count(*) performance for jdbc external table
Backend function DataSourceScanNode::GetNext() handles count query
inefficiently. Even when there are no column data returned from
external data source, it still tries to materialize rows and add
rows to RowBatch one by one up to the number of row count. It also
call GetNextInputBatch() multiple times (count / batch_size), while
GetNextInputBatch() invokes JNI function in external data source.
This patch improves the DataSourceScanNode::GetNext() and
JdbcDataSource.getNext() to avoid unnecessary function calls.
Testing:
- Ran query_test/test_ext_data_sources.py which consists count
queries for jdbc external table.
- Passed core-tests.
Change-Id: I9953dca949eb773022f1d6dcf48d8877857635d6
Reviewed-on: http://gerrit.cloudera.org:8080/20653
Reviewed-by: Abhishek Rawat <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/data-source-scan-node.cc | 45 ++++++++++++++--------
.../impala/extdatasource/jdbc/JdbcDataSource.java | 13 +++----
2 files changed, 33 insertions(+), 25 deletions(-)
diff --git a/be/src/exec/data-source-scan-node.cc
b/be/src/exec/data-source-scan-node.cc
index 5a88224fa..7e01c01c4 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -348,24 +348,35 @@ Status DataSourceScanNode::GetNext(RuntimeState* state,
RowBatch* row_batch, boo
while (true) {
{
SCOPED_TIMER(materialize_tuple_timer());
- // copy rows until we hit the limit/capacity or until we exhaust
input_batch_
- while (!ReachedLimit() && !row_batch->AtCapacity() &&
InputBatchHasNext()) {
- // TODO The timezone depends on flag
use_local_tz_for_unix_timestamp_conversions.
- // Check if this is the intended behaviour.
- RETURN_IF_ERROR(MaterializeNextRow(
- state->time_zone_for_unix_time_conversions(), tuple_pool, tuple));
- ++rows_read;
- int row_idx = row_batch->AddRow();
- TupleRow* tuple_row = row_batch->GetRow(row_idx);
- tuple_row->SetTuple(tuple_idx_, tuple);
-
- if (ExecNode::EvalConjuncts(evals, num_conjuncts, tuple_row)) {
- row_batch->CommitLastRow();
- tuple = reinterpret_cast<Tuple*>(
- reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
- IncrementNumRowsReturned(1);
+ if (tuple_desc_->slots().size() > 0) {
+ // Copy rows until we hit the limit/capacity or until we exhaust
input_batch_
+ while (!ReachedLimit() && !row_batch->AtCapacity() &&
InputBatchHasNext()) {
+ // TODO Timezone depends on flag
use_local_tz_for_unix_timestamp_conversions.
+ // Check if this is the intended behaviour.
+ RETURN_IF_ERROR(MaterializeNextRow(
+ state->time_zone_for_unix_time_conversions(), tuple_pool,
tuple));
+ ++rows_read;
+ int row_idx = row_batch->AddRow();
+ TupleRow* tuple_row = row_batch->GetRow(row_idx);
+ tuple_row->SetTuple(tuple_idx_, tuple);
+
+ if (ExecNode::EvalConjuncts(evals, num_conjuncts, tuple_row)) {
+ row_batch->CommitLastRow();
+ tuple = reinterpret_cast<Tuple*>(
+ reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
+ IncrementNumRowsReturned(1);
+ }
+ ++next_row_idx_;
+ }
+ } else {
+ // For count(*)
+ rows_read += num_rows_;
+ next_row_idx_ += num_rows_;
+ IncrementNumRowsReturned(num_rows_);
+ if (input_batch_->eos) {
+ row_batch->limit_capacity(rows_read);
+ row_batch->CommitRows(rows_read);
}
- ++next_row_idx_;
}
if (row_batch->AtCapacity() || ReachedLimit()
|| (input_batch_->eos && !InputBatchHasNext())) {
diff --git
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index d69ee9f50..28ba111fb 100644
---
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -98,7 +98,7 @@ public class JdbcDataSource implements ExternalDataSource {
// iterator_ is used when schema_.getColsSize() does not equal 0.
private JdbcRecordIterator iterator_ = null;
// currRow_ and totalNumberOfRecords_ are used when schema_.getColsSize()
equals 0.
- private int currRow_;
+ private long currRow_;
private long totalNumberOfRecords_ = 0;
// Enumerates the states of the data source, which indicates which
ExternalDataSource
@@ -193,13 +193,10 @@ public class JdbcDataSource implements ExternalDataSource
{
}
if (!hasNext) eos_ = true;
} else { // for count(*)
- if (currRow_ + batchSize_ <= totalNumberOfRecords_) {
- numRows = batchSize_;
- } else {
- numRows = totalNumberOfRecords_ - currRow_;
- }
- currRow_ += numRows;
- if (currRow_ == totalNumberOfRecords_) eos_ = true;
+ // Don't need to check batchSize_.
+ numRows = totalNumberOfRecords_ - currRow_;
+ currRow_ = totalNumberOfRecords_;
+ eos_ = true;
}
return new TGetNextResult(STATUS_OK).setEos(eos_)
.setRows(new TRowBatch().setCols(cols).setNum_rows(numRows));