Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1255#issuecomment-148716888
Hi @ChengXiangLi, I had a look at your PR and I think we need to change the
implementation a bit.
Right now, it executes an additional job for each range partition operator
to obtain a sample. The additional job executes the full program and samples at
the end. Imagine a complex program that includes for instance several joins and
wants to write out the result in a total order, i.e., range partitions and
sorts the result before it writes to the final sink. With the current
implementation, this would mean that the expensive job is executed twice.
It would be better to inject the sampling into the actual job. This can be
done for example as follow.
For a program such as:
```
DataSet x = ...
x.rangePartition(0).reduce(...)
```
could be translated into:
```
DataSet<X> x = ...
DataSet<Distr> dist = x.mapPartition("sample").reduce("collect samples and
build distribution");
DataSet<Tuple2<Integer,X>> xWithPIDs = x
.map("assign PartitionIDs).withBroadcastSet(dist, "distribution");
```
This would inject the sampling into the original program. The sampling is
done as before, but the data distribution is broadcasted to a map operator that
uses the distribution to assign partition IDs to records and converts the
`DataSet<X>` into a `DataSet<Tuple2<Integer, X>>` similar as the `KeySelector`.
Once the partition IDs are assigned, a RangePartitionOperator could partition
the tuples on the first field (f0) with a simple Int-DataDistribution
(0,1,2,3,4,..., n). Finally, the DataSet needs to be unwrapped, i.e, converted
from `DataSet<Tuple2<Integer,X>>` to `DataSet<X>`.
I agree it is not super nice, but this implementationx would cache the
intermediate result instead of recomputing it. In addition it barely touches
the internals.
It is also possible to integrate the partitioning more tightly into the
runtime by providing the data distribution directly to the Partitioner.
However, that would mean we need to implement a partitioning operator for the
runtime (instead of using the regular operator and a NOOP driver).
Btw. I have some code lying around (for a not-yet-completed features) to
extract keys from a record given the key specification. Let me know if that
would help for your implementation.
Regarding the implementation of the `Partitioner` and `OutputEmitter`, I am
very open for suggestions for how to improve the design. As you said, I would
bring this discussion to the dev mailing list or open a JIRA and start a
discussion there.
What do you think? Thanks, Fabian
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---