[
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207262#comment-15207262
]
ASF GitHub Bot commented on FLINK-3639:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1827#discussion_r57065868
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
---
@@ -72,5 +74,68 @@ class TableEnvironment {
new ScalaBatchTranslator(config).translate[T](table.relNode)
}
-}
+ /**
+ * Registers a DataSet under a unique name, so that it can be used in
SQL queries.
+ * The fields of the DataSet type are used to name the Table fields.
+ * @param name the Table name
+ * @param dataset the DataSet to register
+ */
+ def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+ val (fieldNames, fieldIndexes) =
TranslationContext.getFieldInfo[T](dataset.getType)
+ val dataSetTable = new DataSetTable[T](
+ dataset.javaSet,
+ fieldIndexes,
+ fieldNames
+ )
+ TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+ }
+
+ /**
+ * Registers a DataSet under a unique name, so that it can be used in
SQL queries.
+ * The fields of the DataSet type are renamed to the given set of fields.
+ *
+ * @param name the Table name
+ * @param dataset the DataSet to register
+ * @param fields the field names expression
+ */
+ def registerDataSet[T](name: String, dataset: DataSet[T], fields:
Expression*): Unit = {
+
+ val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](
+ dataset.getType, fields.toArray)
+ val dataSetTable = new DataSetTable[T](
+ dataset.javaSet,
+ fieldIndexes.toArray,
+ fieldNames.toArray
+ )
+ TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+ }
+
+ /**
+ * Registers a Table under a unique name, so that it can be used in SQL
queries.
+ * @param name the Table name
+ * @param table the Table to register
+ */
+ def registerTable[T](name: String, table: Table): Unit = {
+ val tableTable = new TableTable(table.getRelNode())
+ TranslationContext.registerTable(tableTable, name)
+ }
+
+ /**
+ * Retrieve a registered Table.
+ * @param tableName the name under which the Table has been registered
+ * @return the Table object
+ */
+ @throws[TableException]
+ def scan(tableName: String): Table = {
+ if (TranslationContext.isRegistered(tableName)) {
+ val relBuilder = TranslationContext.getRelBuilder
+ relBuilder.scan(tableName)
+ new Table(relBuilder.build(), relBuilder)
+ }
+ else {
+ throw new TableException("Table \"" + tableName + "\" was not found
in the registry.")
--- End diff --
Use Scala's `s""` string building here as well.
> Add methods and utilities to register DataSets and Tables in the
> TableEnvironment
> ---------------------------------------------------------------------------------
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
> Issue Type: New Feature
> Components: Table API
> Affects Versions: 1.1.0
> Reporter: Vasia Kalavri
> Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a
> unique name in the TableEnvironment.
> [This design
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
> describes the proposed API.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)