Hi Jingsong,

I'm sorry, I didn't want to block you for so long on this. I thought about it again.

I think it's fine to add a DataStream Provider if this really unblocks users from migrating to newer Flink versions. I'm guessing you will add that to the table bridge module?

Regarding the parallelism: I see your point of letting users set that explicitly. I'm still skeptical about it but I also think it wasn't such a good idea to let users specify the parallelism of individual operations in the DataStream API because it again takes freedom away from the framework. So if it's really sth that users need we should go ahead.

Best,
Aljoscha

On 09.10.20 13:57, Jingsong Li wrote:
Hi Aljoscha,

I want to separate `Customized parallelism` and `Parallelism inference`.

### Customized parallelism

First, I want to explain the current DataStream parallelism setting:
`env.fromSource(...).setParallelism(...)`.
This is how users explicitly specify parallelism, and it is the only way to
set parallelism.

The underlying Source (Eg.: SourceFunction) is completely independent of
specific parallelism. The peripheral DataStream is responsible for setting
parallelism.
The table layer also needs to provide peer-to-peer capability.

### Parallelism inference

Some sources have the ability to infer parallelism, like Kafka, parallelism
can be inferred from the partition number.

I think you are right, we should provide this to the underlying Source.
This capability must be related to the underlying Source (Eg.:
SourceFunction), so this capability must introduce a new interface for the
underlying Source.

The Table layer just tell underlying Source that user want to open
parallelism inference:

new MyRealSource(path, and, whatnot, parallelismInfer = true)

What do you think?

Best,
Jingsong

On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi,

I'll only respond regarding the parallelism for now because I need to
think some more about DataStream.

What I'm saying is that exposing a parallelism only for Table Connectors
is not the right thing. If we want to allow sources to tell the
system/framework what would be a good parallelism it would be at the
underlying level.

I'll explain with the SourceFunction. A Table API Source connector is
basically a factory that will give you a SourceFunction that corresponds
to whatever the user configured via properties and other means. If the
Table Connector somehow happens to know what would be a good parallelism
for the source it could "tell" the source when creating it, i.e.

    new MyRealSource(path, and, whatnot, parallelismHint)

Then the source could either work with that information it got, by
shutting down (at runtime) some of its parallel instances. Or we could
extend the Source (SourceFunction) API to expose a "parallelism hint" to
the system.

The basic thing is that Table Connectors are not the real connectors,
they just delegate to underlying real connectors. So those underlying
connectors are where we need to change things. Otherwise we would just
have special-case solutions for the Table API.

Best,
Aljoscha

On 25.09.20 14:30, admin wrote:
Hi everyone,
Thanks for the proposal.

In our company,we meet the same situation as @liu shouwei.
We developed some features base on flink.Such as parallelism of sql
source/sink  connector, and kafka delay consumer which is adding a flatmap
and a keyby transformation after the source Datastream.
What make us embarrassing is that when we migrate this features to Flink
1.11,we found that the DataSteam is missing,So we modify the blink’s code
to support parallelism.But kafka delay comsumer is unsolved until now.

  From user’s perspective,it necessary to manipulate DataStream or have
the interoperability between Table API and DataStream.

Best



2020年9月25日 下午4:18,Rui Li <lirui.fu...@gmail.com> 写道:

Hi Jingsong,

Thanks for driving this effort. I have two minor comments.


    1. IMHO, parallelism is a concept that applies to all
ScanTableSource.
    So instead of defining a new interface, is it more natural to
incorporate
    parallel inference to existing interfaces, e.g. ScanTableSource
    or ScanRuntimeProvider?
    2. `scan.infer-parallelism.enabled` doesn't seem very useful to me.
From
    a user's perspective, parallelism is either set by
`scan.parallelism`, or
    automatically decided by Flink. If a user doesn't want the connector
to
    infer parallelism, he/she can simply set `scan.parallelism`, no?


On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <jingsongl...@gmail.com>
wrote:

Hi Aljoscha,

Thank you for your feedback,

## Connector parallelism

Requirements:
Set parallelism by user specified or inferred by connector.

How to configure parallelism in DataStream:
In the DataStream world, the only way to configure parallelism is
`SingleOutputStreamOperator.setParallelism`. Actually, users need to
have
access to DataStream when using a connector, not just the
`SourceFunction`
/ `Source` interface.
Is parallelism related to connectors? I think yes, there are many
connectors that can support obtaining parallelism related information
from
them, and users do exactly that. This means parallelism inference (From
connectors).
The key is that `DataStream` is an open programming API, and users can
freely program to set parallelism.

How to configure parallelism in Table/SQL:
But Table/SQL is not an open programming API, every feature needs a
corresponding mechanism, because the user is no longer able to
program. Our
current connector interface: SourceFunctionProvider,
SinkFunctionProvider,
through these interfaces, there is no ability to generate connector
related
parallelism.
Back to our original intention: to avoid users directly manipulating
`DataStream`. Since we want to avoid it, we need to provide
corresponding
features.

And parallelism is the runtime information of connectors, It fits the
name
of `ScanRuntimeProvider`.

If we wanted to add a "get parallelism" it would be in those
underlying
connectors but I'm also skeptical about adding such a method there
because
it is a static assignment and would preclude clever optimizations
about the
parallelism of a connector at runtime.

I think that when a job is submitted, it is in compile time. It should
only
provide static parallelism.

## DataStream in table connector

As I said before, if we want to completely cancel DataStream in the
table
connector, we need to provide corresponding functions in
`xxRuntimeProvider`.
Otherwise, we and users may not be able to migrate the old connectors.
Including Hive/FileSystem connectors and the user cases I mentioned
above.
CC: @liu shouwei

We really need to consider these cases.
If there is no alternative in a short period of time, for a long
time, users need to continue to use the old table connector API, which
has
been deprecated.

Why not use StreamTableEnvironment fromDataStream/toDataStream?
- These tables are just temporary tables. Can not be integrated/stored
into
Catalog.
- Creating table DDL can not work...
- We need to lose the kinds of useful features of Table/SQL on the
connector. For example, projection pushdown, filter pushdown,
partitions
and etc...

But I believe you are right in the long run. The source and sink APIs
should be powerful enough to cover all reasonable cases.
Maybe we can just introduce them in a minimal way. For example, we only
introduce `DataStreamSinkProvider` in planner as an internal API.

Your points are very meaningful, hope to get your reply.

Best,
Jingsong

On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com>
wrote:

Hi,Aljoscha, I would like to share a use case to second setting
parallelism
of table sink(or limiting parallelism range of table sink): When
writing
data to databases, there is limitation for number of jdbc connections
and
query TPS. we would get errors of too many connections or high load
for
db and poor performance because of too many small requests if the
optimizer
didn't know such information, and set a large parallelism for sink
when
matching the parallelism of its input.

On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org>
wrote:

Thanks for the proposal! I think the use cases that we are trying to
solve are indeed valid. However, I think we might have to take a step
back to look at what we're trying to solve and how we can solve it.

The FLIP seems to have two broader topics: 1) add "get parallelism"
to
sinks/sources 2) let users write DataStream topologies for
sinks/sources. I'll treat them separately below.

I think we should not add "get parallelism" to the Table Sink API
because I think it's the wrong level of abstraction. The Table API
connectors are (or should be) more or less thin wrappers around
"physical" connectors. By "physical" I mean the underlying (mostly
DataStream API) connectors. For example, with the Kafka Connector the
Table API connector just does the configuration parsing and
determines
a
good (de)serialization format and then creates the underlying
FlinkKafkaConsumer/FlinkKafkaProducer.

If we wanted to add a "get parallelism" it would be in those
underlying
connectors but I'm also skeptical about adding such a method there
because it is a static assignment and would preclude clever
optimizations about the parallelism of a connector at runtime. But
maybe
that's thinking too much about future work so I'm open to discussion
there.

Regarding the second point of letting Table connector developers use
DataStream: I think we should not do it. One of the purposes of
FLIP-95
[1] was to decouple the Table API from the DataStream API for the
basic
interfaces. Coupling the two too closely at that basic level will
make
our live harder in the future when we want to evolve those APIs or
when
we want the system to be better at choosing how to execute sources
and
sinks. An example of this is actually the past of the Table API.
Before
FLIP-95 we had connectors that dealt directly with DataSet and
DataStream, meaning that if users wanted their Table Sink to work in
both BATCH and STREAMING mode they had to provide two
implementations.
The trend is towards unifying the sources/sinks to common interfaces
that can be used for both BATCH and STREAMING execution but, again, I
think exposing DataStream here would be a step back in the wrong
direction.

I think the solution to the existing user requirement of using
DataStream sources and sinks with the Table API should be better
interoperability between the two APIs, which is being tackled right
now
in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
we're trying to solve here, maybe we should think about FLIP-136 some
more.

What do you think?

Best,
Aljoscha

[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API





--
Best, Jingsong Lee



--
Best regards!
Rui Li





Reply via email to