Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5240#discussion_r162990874
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
---
@@ -23,22 +23,74 @@ import java.net.URL
import org.apache.commons.configuration.{ConfigurationException,
ConversionException, PropertiesConfiguration}
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException,
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
import org.apache.flink.table.plan.schema.{BatchTableSourceTable,
StreamTableSourceTable, TableSourceTable}
import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource,
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource,
StreamTableSource, TableSource, TableSourceFactoryService}
import org.apache.flink.table.util.Logging
import org.apache.flink.util.InstantiationUtil
import org.reflections.Reflections
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
/**
* The utility class is used to convert ExternalCatalogTable to
TableSourceTable.
*/
object ExternalTableSourceUtil extends Logging {
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a
[[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance
which to convert
+ * @return converted [[TableSourceTable]] instance from the input
catalog table
+ */
+ def fromExternalCatalogTable(
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceTable[_] = {
+
+ // check for the legacy external catalog path
+ if (externalCatalogTable.isLegacyTableType) {
+ LOG.warn("External catalog tables based on TableType annotations are
deprecated. " +
+ "Please consider updating them to TableSourceFactories.")
+ fromExternalCatalogTableType(externalCatalogTable)
+ }
+ // use the factory approach
+ else {
+ val source =
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+ tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment =>
+ source match {
+ case bts: BatchTableSource[_] =>
+ new BatchTableSourceTable(
+ bts,
+ new FlinkStatistic(externalCatalogTable.getTableStats))
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}'
is not applicable " +
+ s"in a batch environment.")
+ }
+ // check for a stream table source in this streaming environment
+ case _: StreamTableEnvironment =>
+ source match {
+ case sts: StreamTableSource[_] =>
+ new StreamTableSourceTable(
+ sts,
+ new FlinkStatistic(externalCatalogTable.getTableStats))
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}'
is not applicable " +
+ s"in a streaming environment.")
+ }
+ case _ => throw new TableException("Unsupported table
environment.")
+ }
+ }
+ }
+
+ //
----------------------------------------------------------------------------------------------
+ // NOTE: the following line can be removed once we drop support for
TableType
--- End diff --
I think we can also remove the `org.reflections:reflections` dependency
once we removed this.
---