Thanks a lot for the comments, Fabian. I agree with you on the plan mostly, 
just add some more thoughts about  Non-Range-Equally-Splittable case here.
1. Let's assume a case which 10% data is skewed on certain key, in this case, 
as long as the parallelism is larger than 10, it would fit into 
Non-Range-Equally-Splittable case. So it should not be very corner case of skew 
issue.
2. In proposal, the solution of Non-Range-Equally-Splittable case is based on 2 
new RangePartitioner and little optimizer logic, which has been touched already 
in the plan #1, #2. It does not require to change anything about the operator 
semantics, so if we have a good partitioner abstraction, I think it does not 
add much complexity for Flink to handle this kind of issue. 
It should not block anything, after finished the simple case, we would have 
more knowledge about the implementation details, then we can look back at this 
issue, and decide whether it's deserved to be resolved at the cost.

Thanks
Chengxiang 
-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, October 19, 2015 7:15 PM
To: dev@flink.apache.org
Subject: Re: A proposal about skew data handling in Flink

Hi,

First of all, thanks a lot for this extensive proposal! It contains a lot of 
good observations and techniques how to address data skew.

I have a few remarks:

1) The terms Input and Output Contract were introduced in the first scientific 
publications and are not used anymore. Input Contract are what we call 
operators or transformations today, the concept of output contract is 
completely gone.
In the current code, we have operators like Map, Reduce, and Join that describe 
how data needs to be organized (by key, etc.) and UDFs that process the data.

2) I would categorize skew as follows:

- UDF Call Complexity Skew: The input cardinalities of UDF calls differ (only 
applicable to group-based operators such as GroupReduce and CoGroup) or the 
computational complexity of UDF calls depends on the data and varies a lot. UDF 
calls are the smallest parallelizable unit. It is not possible to change that 
without changing the semantics. Combiners can help to reduce the effect of skew 
for group-based operators.

- Input Partition Skew: The cardinality of parallel partitions varies. This is 
handled by Flink as follows:
    - Lazy split assignment for data sources
    - Operators that do not require special partitioning (Map, Filter, Cross, 
etc.) just consume the output partitions of the preceding operator.
Rebalance() can be used to enforce round-robin partitioning to equalize size of 
all partitions.
    - Operators that require key-based partitioning use hash partitioning.
Range partitioning can help address significant data skew.

- UDF Call Skew: The number of UDF calls per parallel partition varies.
This can be an issue for n-m joins which essentially result in Cartesian 
products.
    - UDF Call Skew is most relevant for Joins
    - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by 
controlling Input Partition Skew

3) I agree that we should not try to detect and automatically fix data skew (at 
the moment) but give users tools to manually manage skew.

4) I would focus on addressing the Input Partition Skew problem. UDF Call 
Complexity Skew cannot be addressed because it would change the semantics of 
operators. UDF Call Skew is only affecting joins and much harder to solve.

5) I wonder how much the practical gain is to address the 
Non-Range-Equally-Splittable case compared to the added code complexity. In 
general, tackling skew is a very good idea, but solving corner cases with quite 
complex methods might make future features more complicated to add.
Hence, I would propose to focus on the common and "easy" cases first.

I would address Input Partition Skew first and ignore the 
Non-Range-Equally-Splittable case for now. We can do this in two steps:

1) Add the "simple" range partitioner as in your pull request for unary 
operators (explicit range partitioning, total order, groupBy). Once the 
sampling happens online, this is a very good addition to Flink.
2) Add the "simple" range partitioner also for binary operators (join, 
coGroup). This will be a bit more tricky, because we need to do a coordinated 
decision for both inputs.
3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe 
through optimizer hints.

Since we want to have this transparently handled by the API and engine, we need 
to add a lot of these features into the optimizer, or JobGraphGenerator to be 
more precisely.

Does that make sense to you?

Cheers, Fabian

2015-10-16 17:13 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi,
>
> thanks for starting a discussion about data skew! I agree, it's a 
> important issue that can cause a lot of problems.
> I'll have a look at your proposal and add comments soon.
>
> Thanks, Fabian
>
> 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <chengxiang...@intel.com>:
>
>> Dear all,
>> In many real world use case, data are nature to be skewed. For 
>> example, in social network, famous people get much more "follow" than 
>> others, a hot tweet would be transferred millions of times. and the 
>> purchased records of normal product can never compared to hot 
>> products. While at the same time, Flink runtime assume that all tasks 
>> consume same size resources, this's not always true. Skew data 
>> handling try to make skewed data fit into Flink's runtime.
>> I write a proposal about skew data handling in Flink, you can read it 
>> at 
>> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIf
>> eDYldvZsKI/edit?usp=sharing
>> .
>> Any comments and feedback are welcome, you can comment on the google 
>> doc, or reply this email thread directly.
>>
>> Thanks
>> Chengxiang
>>
>
>

Reply via email to