This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d318f1c99 IMPALA-12377: Improve count(*) performance for jdbc external 
table
d318f1c99 is described below

commit d318f1c99208196abdf0d0f57490439ddb421cab
Author: wzhou-code <[email protected]>
AuthorDate: Thu Nov 2 23:02:41 2023 -0700

    IMPALA-12377: Improve count(*) performance for jdbc external table
    
    Backend function DataSourceScanNode::GetNext() handles count query
    inefficiently. Even when there are no column data returned from
    external data source, it still tries to materialize rows and add
    rows to RowBatch one by one up to the number of row count. It also
    call GetNextInputBatch() multiple times (count / batch_size), while
    GetNextInputBatch() invokes JNI function in external data source.
    
    This patch improves the DataSourceScanNode::GetNext() and
    JdbcDataSource.getNext() to avoid unnecessary function calls.
    
    Testing:
     - Ran query_test/test_ext_data_sources.py which consists count
       queries for jdbc external table.
     - Passed core-tests.
    
    Change-Id: I9953dca949eb773022f1d6dcf48d8877857635d6
    Reviewed-on: http://gerrit.cloudera.org:8080/20653
    Reviewed-by: Abhishek Rawat <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/data-source-scan-node.cc               | 45 ++++++++++++++--------
 .../impala/extdatasource/jdbc/JdbcDataSource.java  | 13 +++----
 2 files changed, 33 insertions(+), 25 deletions(-)

diff --git a/be/src/exec/data-source-scan-node.cc 
b/be/src/exec/data-source-scan-node.cc
index 5a88224fa..7e01c01c4 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -348,24 +348,35 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, boo
   while (true) {
     {
       SCOPED_TIMER(materialize_tuple_timer());
-      // copy rows until we hit the limit/capacity or until we exhaust 
input_batch_
-      while (!ReachedLimit() && !row_batch->AtCapacity() && 
InputBatchHasNext()) {
-        // TODO The timezone depends on flag 
use_local_tz_for_unix_timestamp_conversions.
-        //      Check if this is the intended behaviour.
-        RETURN_IF_ERROR(MaterializeNextRow(
-            state->time_zone_for_unix_time_conversions(), tuple_pool, tuple));
-        ++rows_read;
-        int row_idx = row_batch->AddRow();
-        TupleRow* tuple_row = row_batch->GetRow(row_idx);
-        tuple_row->SetTuple(tuple_idx_, tuple);
-
-        if (ExecNode::EvalConjuncts(evals, num_conjuncts, tuple_row)) {
-          row_batch->CommitLastRow();
-          tuple = reinterpret_cast<Tuple*>(
-              reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
-          IncrementNumRowsReturned(1);
+      if (tuple_desc_->slots().size() > 0) {
+        // Copy rows until we hit the limit/capacity or until we exhaust 
input_batch_
+        while (!ReachedLimit() && !row_batch->AtCapacity() && 
InputBatchHasNext()) {
+          // TODO Timezone depends on flag 
use_local_tz_for_unix_timestamp_conversions.
+          //      Check if this is the intended behaviour.
+          RETURN_IF_ERROR(MaterializeNextRow(
+              state->time_zone_for_unix_time_conversions(), tuple_pool, 
tuple));
+          ++rows_read;
+          int row_idx = row_batch->AddRow();
+          TupleRow* tuple_row = row_batch->GetRow(row_idx);
+          tuple_row->SetTuple(tuple_idx_, tuple);
+
+          if (ExecNode::EvalConjuncts(evals, num_conjuncts, tuple_row)) {
+            row_batch->CommitLastRow();
+            tuple = reinterpret_cast<Tuple*>(
+                reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
+            IncrementNumRowsReturned(1);
+          }
+          ++next_row_idx_;
+        }
+      } else {
+        // For count(*)
+        rows_read += num_rows_;
+        next_row_idx_ += num_rows_;
+        IncrementNumRowsReturned(num_rows_);
+        if (input_batch_->eos) {
+          row_batch->limit_capacity(rows_read);
+          row_batch->CommitRows(rows_read);
         }
-        ++next_row_idx_;
       }
       if (row_batch->AtCapacity() || ReachedLimit()
           || (input_batch_->eos && !InputBatchHasNext())) {
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index d69ee9f50..28ba111fb 100644
--- 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -98,7 +98,7 @@ public class JdbcDataSource implements ExternalDataSource {
   // iterator_ is used when schema_.getColsSize() does not equal 0.
   private JdbcRecordIterator iterator_ = null;
   // currRow_ and totalNumberOfRecords_ are used when schema_.getColsSize() 
equals 0.
-  private int currRow_;
+  private long currRow_;
   private long totalNumberOfRecords_ = 0;
 
   // Enumerates the states of the data source, which indicates which 
ExternalDataSource
@@ -193,13 +193,10 @@ public class JdbcDataSource implements ExternalDataSource 
{
       }
       if (!hasNext) eos_ = true;
     } else { // for count(*)
-      if (currRow_ + batchSize_ <= totalNumberOfRecords_) {
-        numRows = batchSize_;
-      } else {
-        numRows = totalNumberOfRecords_ - currRow_;
-      }
-      currRow_ += numRows;
-      if (currRow_ == totalNumberOfRecords_) eos_ = true;
+      // Don't need to check batchSize_.
+      numRows = totalNumberOfRecords_ - currRow_;
+      currRow_ = totalNumberOfRecords_;
+      eos_ = true;
     }
     return new TGetNextResult(STATUS_OK).setEos(eos_)
         .setRows(new TRowBatch().setCols(cols).setNum_rows(numRows));

Reply via email to