Hi Jingsong,

Thanks for bringing up this discussion. I like this idea generally.
I'd like to add some cases we met in our scenarios.

## Source Partition By
There is an use case that users want to do some lookup thing in the UDF,
it's very like the dimension table. It's common for them to cache some
lookup result
in their UDF. If there is no 'partition by' from the source, they maybe
need to cache
more data in each subtask.
Actually, what they need is `keyBy` from the DataStream world.
We supported this feature already in our production.

## Sink Partition By
Recently we also received some requirements about this feature.
Users want to add their custom sink, and need the data shuffled before the
sink.
They will do some thing for the data of the same partition key.

An addition to 'Source Partition By' semantic, actually, it's not enough
for current use cases.
The more common way to do this is to add partition by semantic in 'view',
then
users can do the 'keyBy' multiple times in one query.

I've no strong options about these features, just add some use cases and
would like to hear more options about this.


Konstantin Knauf <[email protected]> 于2020年8月31日周一 下午7:09写道:

> Hi Jingsong,
>
> I would like to understand this FLIP (?) a bit better, but I am missing
> some background, I believe. So, some basic questions:
>
> 1) Does the PARTITION BY clause only have an effect for sink tables
> defining how data should be partitioning the sink system or does it also
> make a difference for source tables? My understanding is that it also makes
> a difference for source tables (e.g. if the source system
> supports partition pruning). I suppose, for source tables Flink does not
> check/enforce this, but trusts that the partitioning information is
> correct?!
>
> 2) I suppose it is up to the connector implementation whether/how to
> interpret the partition information. How will this work?
>
> 3) For Kafka, I suppose, the most common partitioning strategy is by key.
> FLIP-107 contains a proposal on how to define the key (which fields of the
> schema should become part of the key) when writing to Kafka via Flink SQL.
> How does this relate to the PARTITION BY clause?
>
> Thanks,
>
> Konstantin
>
>
>
> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <[email protected]>
> wrote:
>
> > Hi all,
> >
> > ## Motivation
> >
> > FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
> > extent that let us support Hive's partitioning.
> > But this partition definition is completely specific to Hive/File
> > systems, with the continuous development of the system, there are new
> > requirements:
> >
> > - FLIP-107 [2] requirements: A common requirement is to create a custom
> > partitioning of the data. We should have a way to specify/compute target
> > partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be
> the
> > only way to control partitioning.
> >
> > - Apache Iceberg partitioning [3] requirements: Iceberg produces
> partition
> > values by taking a column value and optionally transforming it. Iceberg
> is
> > responsible for converting event_time into event_date, and keeps track of
> > the relationship.
> >
> > So I think it is better to introduce partitioning strategies to Flink,
> > the partitioning strategies is similar to partitioning in traditional
> > database like Oracle [4].
> >
> > ## Proposed Partitioning DDL
> >
> > Hash Partitioning Tables:
> >
> > CREATE TABLE kafka_table (
> >   id STRING,
> >   name STRING,
> >   date: DATE ... )
> > PARTITIONED BY (HASH(id, name))
> >
> > Explicit Partitioning Tables (Introduced in FLIP-63):
> >
> > CREATE TABLE fs_table (
> >   name STRING,
> >   date: DATE ... )
> > PARTITIONED BY (date)
> >
> > (Can we remove the brackets when there is only a single layer partition?
> =>
> > "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
> >
> > Composite Partitioning Tables:
> >
> > CREATE TABLE fs_table (
> >   name STRING,
> >   date: Date
> >    ... )
> > PARTITIONED BY (year(date), month(date), day(date))
> >
> > Composite Explicit Partitioning Tables (Introduced in FLIP-63):
> >
> > CREATE TABLE fs_table (
> >   name STRING,
> >   date: Date,
> >   y: STRING,'
> >   m: STRING,
> >   d: STRING,
> >    ... )
> > PARTITIONED BY (y, m, d)
> >
> > ## Rejected Alternatives
> >
> > Composite Partitioning Tables DDL like Oracle:
> >
> > CREATE TABLE invoices (
> >   invoice_no    NUMBER NOT NULL,
> >   invoice_date  DATE   NOT NULL,
> >   comments      VARCHAR2(500))
> > PARTITION BY RANGE (invoice_date)
> > SUBPARTITION BY HASH (invoice_no)
> > SUBPARTITIONS 8 (
> >   PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
> > 'DD/MM/YYYY')),
> >   PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
> > 'DD/MM/YYYY')),
> >   PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
> > 'DD/MM/YYYY')),
> >   PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
> > 'DD/MM/YYYY'));
> >
> > - First, Multi level partitioning is a common thing in big data systems.
> > - Second, the syntax of "SUBPARTITIONS" is not only more complex, but
> also
> > completely different from big data systems such as hive. Big data systems
> > need to specify less partition information than traditional ones, so it
> is
> > more natural to write all partitions in one bracket.
> >
> > ## Other Interface changes
> >
> > It can be imagined that this change will involve many Catalog / Table
> > related interfaces, and it is necessary to replace the previous
> > `List<String> partitionKeys` with `partitioning strategies`.
> >
> > What do you think?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> > [3]http://iceberg.apache.org/partitioning/
> > [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
> >
> > Best,
> > Jingsong
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


-- 

Best,
Benchao Li

Reply via email to