[
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334931#comment-16334931
]
ASF GitHub Bot commented on FLINK-8240:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5240#discussion_r162990874
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
---
@@ -23,22 +23,74 @@ import java.net.URL
import org.apache.commons.configuration.{ConfigurationException,
ConversionException, PropertiesConfiguration}
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException,
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
import org.apache.flink.table.plan.schema.{BatchTableSourceTable,
StreamTableSourceTable, TableSourceTable}
import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource,
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource,
StreamTableSource, TableSource, TableSourceFactoryService}
import org.apache.flink.table.util.Logging
import org.apache.flink.util.InstantiationUtil
import org.reflections.Reflections
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
/**
* The utility class is used to convert ExternalCatalogTable to
TableSourceTable.
*/
object ExternalTableSourceUtil extends Logging {
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a
[[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance
which to convert
+ * @return converted [[TableSourceTable]] instance from the input
catalog table
+ */
+ def fromExternalCatalogTable(
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceTable[_] = {
+
+ // check for the legacy external catalog path
+ if (externalCatalogTable.isLegacyTableType) {
+ LOG.warn("External catalog tables based on TableType annotations are
deprecated. " +
+ "Please consider updating them to TableSourceFactories.")
+ fromExternalCatalogTableType(externalCatalogTable)
+ }
+ // use the factory approach
+ else {
+ val source =
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+ tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment =>
+ source match {
+ case bts: BatchTableSource[_] =>
+ new BatchTableSourceTable(
+ bts,
+ new FlinkStatistic(externalCatalogTable.getTableStats))
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}'
is not applicable " +
+ s"in a batch environment.")
+ }
+ // check for a stream table source in this streaming environment
+ case _: StreamTableEnvironment =>
+ source match {
+ case sts: StreamTableSource[_] =>
+ new StreamTableSourceTable(
+ sts,
+ new FlinkStatistic(externalCatalogTable.getTableStats))
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}'
is not applicable " +
+ s"in a streaming environment.")
+ }
+ case _ => throw new TableException("Unsupported table
environment.")
+ }
+ }
+ }
+
+ //
----------------------------------------------------------------------------------------------
+ // NOTE: the following line can be removed once we drop support for
TableType
--- End diff --
I think we can also remove the `org.reflections:reflections` dependency
once we removed this.
> Create unified interfaces to configure and instatiate TableSources
> ------------------------------------------------------------------
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
>
> At the moment every table source has different ways for configuration and
> instantiation. Some table source are tailored to a specific encoding (e.g.,
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining
> common properties, and instantiation. The {{TableSourceConverters}} provide a
> similar functionality but use an external catalog. We might generialize this
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
> - Type (e.g. Kafka, Custom)
> - Properties (e.g. topic, connection info)
> - Encoding
> - Type (e.g. Avro, JSON, CSV)
> - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
> - Watermark strategy and Watermark properties
> - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is
> very welcome.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)