Quanlong Huang created IMPALA-7751:
--------------------------------------

             Summary: Kudu insert statement should push down range partition 
predicates
                 Key: IMPALA-7751
                 URL: https://issues.apache.org/jira/browse/IMPALA-7751
             Project: IMPALA
          Issue Type: Improvement
            Reporter: Quanlong Huang


We have a job dumping newly added data in HDFS into Kudu table for good 
performance of point queries. Each day we create a new range partition in Kudu 
for the new data on this day. When we add more and more Kudu range partitions, 
we found performance degradation of this job.

The root cause is, the insert statement for kudu does not leverage the 
partition predicates for kudu range partition keys, which causes skew on the 
insert nodes.

How to reveal this:
Step 1: Launch impala cluster with 3 nodes.
Step 2: Create an HDFS table with more than 3 underlying files, thus will have 
more than 3 scan ranges
{code:sql}
create table default.metrics_tbl (
  source_id string,
  event_timestamp bigint,
  value double
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
{code}
Upload the three attached tsv files into its directory and refresh this table 
in Impala.

Step 3: Create a Kudu table with mix partitions containing 3 hash partitions 
and 3 range partitions.
{code:sql}
create table default.metrics_kudu_tbl (
  source_id string,
  event_timestamp bigint,
  value double,
  primary key(source_id, event_timestamp)
) partition by
  hash (source_id) PARTITIONS 3,
  range (event_timestamp) (
    partition 0 <= values < 10000,
    partition 10000 <= values < 20000,
    partition 20000 <= values < 30000
) stored as kudu;
{code}
Step 4: Dump rows in HDFS table into Kudu giving partition predicates.
{code:sql}
insert into table metrics_kudu_tbl
  select source_id, event_timestamp, value from metrics_tbl
  where event_timestamp >= 10000 and event_timestamp < 20000;
{code}
Step 5: Looking into the profile, there're three fragment instances containing 
KuduTableSink but only one of them received and generated data.
{code:java}
    Averaged Fragment F01:
      KuduTableSink:
         - TotalNumRows: 1.00K (1000)
    Fragment F01:
      Instance 6347506799a2966d:6e82f49200000004
        KuduTableSink:
           - TotalNumRows: 3.00K (3000)
      Instance 6347506799a2966d:6e82f49200000005
        KuduTableSink:
           - TotalNumRows: 0 (0)
      Instance 6347506799a2966d:6e82f49200000003
        KuduTableSink:
           - TotalNumRows: 0 (0)
{code}
Thus, only one fragment instance of F01 is sorting and ingesting data into 
Impala.

Generally, if there're N range partitions and all the inserted rows are belong 
to one range (supplied by the partition predicates in WHERE clause), only 1/N 
of the insert fragments are producing data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to