[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16543240#comment-16543240
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202289678
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
val properties = new DescriptorProperties()
externalCatalogTable.addProperties(properties)
val javaMap = properties.asMap
- val source = TableFactoryService.find(classOf[TableSourceFactory[_]],
javaMap)
- .asInstanceOf[TableSourceFactory[_]]
- .createTableSource(javaMap)
tableEnv match {
// check for a batch table source in this batch environment
case _: BatchTableEnvironment =>
- source match {
- case bts: BatchTableSource[_] =>
- new TableSourceSinkTable(Some(new BatchTableSourceTable(
- bts,
- new FlinkStatistic(externalCatalogTable.getTableStats))),
None)
- case _ => throw new TableException(
- s"Found table source '${source.getClass.getCanonicalName}' is
not applicable " +
- s"in a batch environment.")
- }
+ val source = TableFactoryService
--- End diff --
1.
This case looks kind of ugly. Shouldn't it be unify to sth like:
```
TableFactoryService.find(classOf[TableSourceFactory], javaMap)
.createTableSource(javaMap,
environment.isInstanceOf[StreamTableEnvironment])
```
?
If you really want, you can on top of that define methods:
```
def createBatchTableSource(val javaMap) = createTableSource(javaMap,
isStream = false)
def createStreamTableSource(val javaMap) = createTableSource(javaMap,
isStream = true)
```
but I would skip them.
Factories could choose to support or not streaming/batching tables. Same
applies to similar code in `ExecutionContext.java`.
2. Ditto: Is there any significant value of keeping both
`BatchTableSink(Source)Factory` and `StreamTableSink(Source)Factory`?
both 1. and 2 adds quite a lot of boilerplate code for definition and for
caller.
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)