Hi Rong,

t is definitely do-able to use Plugins for TableFactories discovery, but it 
would require more extensive changes/discussions. Couple of challenges there:

1. Currently TableFactories come from user job’s jar, which would mean support 
for dynamically loaded plugins or Pluggable TableFactories would have to be 
shipped statically (which I think should be acceptable, at least initially, but 
it’s different setup compared to the status quo).
2. How to expose Plugins at the Public API level? That would have to be solved 
because they would need to be accessed in the StreamingTableSource.

Piotrek

> On 17 Apr 2019, at 02:59, Rong Rong <[email protected]> wrote:
> 
> Hi All,
> 
> Sorry for joining the discussion late. Thanks Piotrek for initiating this
> effort.
> 
> I recall reporting a very similar bug years ago[1] that was not easily
> solvable at the time, so +1 on this feature goes beyond just FileSystem :-)
> I think this would definitely be beneficial as a useful way for others to
> extend on top as a specialized framework/platform.
> 
> Here are some of my thoughts to the questions raised.
> 1. I am not exactly sure about the implementation details, but currently
> Flink's Table factory discovery system [2] also handles loading of various
> different table system in a way. I was wondering if this would be an
> acceptable way to discover services. similar question was also raised in
> the discussion for supporting Service-Provider pattern [3] in Flink
> security module installation.
> 3. I am assuming this means DataStream object directly, not via
> StreamExecutionEnvironment.addSource(sourceFunction) [4]. In this case, how
> would the DataStream object be instantiated? any reason we would want to
> support direct DataStream source/sink without going through the source/sink
> factory?
> 
> I have not thought through about (2) and (4), but I think they are all very
> valid questions as we also suffer from these pain points managing it with
> our prod environment.
> Looking forward to contributing to this effort!
> 
> Best,
> Rong
> 
> [1] https://issues.apache.org/jira/browse/FLINK-7373 
> <https://issues.apache.org/jira/browse/FLINK-7373>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#define-a-tablefactory
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#define-a-tablefactory>
> [3]
> https://docs.google.com/document/d/1j96kjf-Nbk8Kii276SLSajhpCUuNHvNO5feZUnnPcPE/
>  
> <https://docs.google.com/document/d/1j96kjf-Nbk8Kii276SLSajhpCUuNHvNO5feZUnnPcPE/>
> [4]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumer
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumer>
> 
> 
> 
> On Wed, Apr 10, 2019 at 12:25 PM Piotr Nowojski <[email protected] 
> <mailto:[email protected]>> wrote:
> 
>> Hi Biao,
>> 
>> No there is not. The current scope is so small and it was actually very
>> easy to implement, that design doc might have been an overkill for it.
>> While for the future plans we decided consciously to not account for them
>> and not plan at the moment, but to tackle them lazily.
>> 
>> Idea behind current proposal is described at the moment in java docs [1],
>> or in the tests and there will be of course a documentation update. Gist is
>> to have `./plugins` directory, with subdirectories of jars:
>> 
>> ./plugins/pluginA/foo-1.0.jar
>> ./plugins/pluginA/bar-0.9.jar
>> 
>> ./plugins/pluginB/foo-2.0.jar
>> 
>> All of the jars from each plugin subdirectory will be loaded in separate
>> class loaders and instead of looking for `FileSystem.class` implementations
>> in the Flink’s main class loader, we will iterate over those plugins’ class
>> loaders and search inside them. Which is a simple extension of the current
>> mechanism.
>> 
>> In the future we will need some discussion how do we want to evolve this
>> mechanism to handle more than just `FileSystem`s:
>> 1. Will we want to introduce some centralised architecture? That we would
>> discover a `Plugin.class` implementations and `Plugin` would provide
>> `FileSystem`, `MetricReporter`, `Connector` implementations? Or would we
>> discover `FileSystem`, `MetricReporter`, `Connector` separately?
>> 2. Do we want to support “dynamically” loaded plugins, provided during the
>> job submission?
>> 3. Do we want to support somehow DataStream Sources/Sinks as plugins? If
>> yes, how would the API look like? Some shared between connectors
>> Sinks/Sources factory interface?
>> 4. How to expose `Plugins` inside operators, especially in Table API/SQL
>> operators to support TableSource/SinkFactories loaded via plugins?
>> 5. …?
>> 
>> Those are some of the questions that we are intentionally trying to avoid
>> at the moment.
>> 
>> Piotrek
>> 
>> [1]
>> https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5
>> <
>> https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5
>>  
>> <https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5>
>>> 
>> 
>>> On 10 Apr 2019, at 17:40, Biao Liu <[email protected]> wrote:
>>> 
>>> Hi Stefan &Piotr,
>>> Thank you for bringing this discussion. As Zhijiang said, class conflict
>>> makes a lot of trouble in our production environment.
>>> I was wondering is there any design document currently? It might be
>> helpful
>>> to understand the PR and even the whole picture as Piotr said in the
>> future
>>> it could be extended to other modules.
>>> 
>>> Stefan Richter <[email protected]> 于2019年4月10日周三 下午11:22写道:
>>> 
>>>> Thank you Piotr for bringing this discussion to the mailing list! As it
>>>> was not explicitly mentioned in the first email, I wanted to add that
>> there
>>>> is also already an open PR[1] with my implementation of the basic plugin
>>>> mechanism for FileSystem. Looking forward to some feedback from the
>>>> community.
>>>> 
>>>> 
>>>> [1] https://github.com/apache/flink/pull/8038 <
>>>> https://github.com/apache/flink/pull/8038>
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>>> On 10. Apr 2019, at 17:08, zhijiang <[email protected]
>> .INVALID>
>>>> wrote:
>>>>> 
>>>>> Thanks Piotr for proposing this new feature.
>>>>> 
>>>>> The solution for class loader issue is really helpful in production,
>> and
>>>> we ofen encountered this pain point before.
>>>>> It might bring more possibilities based on this pluggable mechanism.
>>>> Hope to see the progress soon. :)
>>>>> 
>>>>> Best,
>>>>> Zhijiang
>>>>> ------------------------------------------------------------------
>>>>> From:Jeff Zhang <[email protected]>
>>>>> Send Time:2019年4月10日(星期三) 22:01
>>>>> To:dev <[email protected]>
>>>>> Subject:Re: Introducing Flink's Plugin mechanism
>>>>> 
>>>>> Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
>>>>> important for the ecosystem of flink.
>>>>> 
>>>>> Piotr Nowojski <[email protected]> 于2019年4月10日周三 下午5:48写道:
>>>>> 
>>>>>> Hi Flink developers,
>>>>>> 
>>>>>> I would like to introduce a new plugin loading mechanism that we are
>>>>>> working on right now [1]. The idea is quite simple: isolate services
>> in
>>>>>> separate independent class loaders, so that classes and dependencies
>> do
>>>> not
>>>>>> leak between them and/or Flink runtime itself. Currently we have quite
>>>> some
>>>>>> problems with dependency convergence in multiple places. Some of them
>> we
>>>>>> are solving by shading (built in file systems, metrics), some we are
>>>>>> forcing users to deal with them (custom file systems/metrics) and
>>>> others we
>>>>>> do not solve (connectors - we do not support using different Kafka
>>>> versions
>>>>>> in the same job/SQL). With proper plugins, that are loaded in
>>>> independent
>>>>>> class loaders, those issues could be solved in a generic way.
>>>>>> 
>>>>>> Current scope of implementation targets only file systems, without a
>>>>>> centralised Plugin architecture and with Plugins that are only
>>>> “statically”
>>>>>> initialised at the TaskManager and JobManager start up. More or less
>> we
>>>> are
>>>>>> just replacing the way how FileSystem’s implementations are
>> discovered &
>>>>>> loaded.
>>>>>> 
>>>>>> In the future this idea could be extended to different modules, like
>>>>>> metric reporters, connectors, functions/data types (especially in
>> SQL),
>>>>>> state backends, internal storage or other future efforts. Some of
>> those
>>>>>> would be easier than others: the metric reporters would require some
>>>>>> smaller refactor, while connectors would require some bigger API
>> design
>>>>>> discussions, which I would like to avoid at the moment. Nevertheless I
>>>>>> wanted to reach out with this idea so if some other potential use
>> cases
>>>> pop
>>>>>> up in the future, more people will be aware.
>>>>>> 
>>>>>> Piotr Nowojski
>>>>>> 
>>>>>> 
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
>>>>>> https://issues.apache.org/jira/browse/FLINK-11952>
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Best Regards
>>>>> 
>>>>> Jeff Zhang

Reply via email to