[ 
https://issues.apache.org/jira/browse/IMPALA-7751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664449#comment-16664449
 ] 

Quanlong Huang commented on IMPALA-7751:
----------------------------------------

[~lv], we've tried this before. Though it can balance the load from Impala 
side, it brings extra pressure to Kudu. If we have 50 impalad nodes, using the 
"NOSHUFFLE" hint, all of them can insert data into a same kudu partition. 
Sometimes we see failures like
{code:java}
Kudu error(s) reported, first error: Timed out: Failed to write batch of 611 
ops to tablet 5581db1268e04747befd33a67bf1d0df after 491 attempt(s): Failed to 
write to server: (no server available): Write(tablet: 
5581db1268e04747befd33a67bf1d0df, num_ops: 611, num_attempts: 491) passed its 
deadline: Remote error: Service unavailable: Transaction failed, tablet 
5581db1268e04747befd33a67bf1d0df transaction memory consumption (62818976) has 
exceeded its limit (67108864) or the limit of an ancestral tracker
{code}
 
We still want each Kudu partition receive sorted data from a single Impala 
insert node. The optimal way is to let Impala partition the data by the 
{color:#FF0000}*pruned*{color} kudu partitions. The predicates in the insert 
statement can be used in prunning Kudu partitions. Is it hard to do this?

> Kudu insert statement should push down range partition predicates
> -----------------------------------------------------------------
>
>                 Key: IMPALA-7751
>                 URL: https://issues.apache.org/jira/browse/IMPALA-7751
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Quanlong Huang
>            Priority: Major
>         Attachments: metrics1.tsv, metrics2.tsv, metrics3.tsv, profile.txt
>
>
> We have a job dumping newly added data in HDFS into Kudu table for good 
> performance of point queries. Each day we create a new range partition in 
> Kudu for the new data on this day. When we add more and more Kudu range 
> partitions, we found performance degradation of this job.
> The root cause is, the insert statement for kudu does not leverage the 
> partition predicates for kudu range partition keys, which causes skew on the 
> insert nodes.
> How to reveal this:
> Step 1: Launch impala cluster with 3 nodes.
> Step 2: Create an HDFS table with more than 3 underlying files, thus will 
> have more than 3 scan ranges
> {code:sql}
> create table default.metrics_tbl (
>   source_id string,
>   event_timestamp bigint,
>   value double
> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
> {code}
> Upload the three attached tsv files into its directory and refresh this table 
> in Impala.
> Step 3: Create a Kudu table with mix partitions containing 3 hash partitions 
> and 3 range partitions.
> {code:sql}
> create table default.metrics_kudu_tbl (
>   source_id string,
>   event_timestamp bigint,
>   value double,
>   primary key(source_id, event_timestamp)
> ) partition by
>   hash (source_id) PARTITIONS 3,
>   range (event_timestamp) (
>     partition 0 <= values < 10000,
>     partition 10000 <= values < 20000,
>     partition 20000 <= values < 30000
> ) stored as kudu;
> {code}
> Step 4: Dump rows in HDFS table into Kudu giving partition predicates.
> {code:sql}
> insert into table metrics_kudu_tbl
>   select source_id, event_timestamp, value from metrics_tbl
>   where event_timestamp >= 10000 and event_timestamp < 20000;
> {code}
> Step 5: Looking into the profile, there're three fragment instances 
> containing KuduTableSink but only one of them received and generated data.
> {code:java}
>     Averaged Fragment F01:
>       KuduTableSink:
>          - TotalNumRows: 1.00K (1000)
>     Fragment F01:
>       Instance 6347506799a2966d:6e82f49200000004
>         KuduTableSink:
>            - TotalNumRows: 3.00K (3000)
>       Instance 6347506799a2966d:6e82f49200000005
>         KuduTableSink:
>            - TotalNumRows: 0 (0)
>       Instance 6347506799a2966d:6e82f49200000003
>         KuduTableSink:
>            - TotalNumRows: 0 (0)
> {code}
> Thus, only one fragment instance of F01 is sorting and ingesting data into 
> Impala.
> Generally, if there're N range partitions and all the inserted rows are 
> belong to one range (supplied by the partition predicates in WHERE clause), 
> only 1/N of the insert fragments are producing data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to