Repository: flink Updated Branches: refs/heads/master 6ae759ae5 -> acea4cde5
[FLINK-6574] [table] Support nested catalogs in ExternalCatalog. This closes #3913. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acea4cde Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acea4cde Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acea4cde Branch: refs/heads/master Commit: acea4cde5f0225db9e00bbef4a47fdb58419022b Parents: 6ae759a Author: Haohui Mai <[email protected]> Authored: Mon May 15 17:09:18 2017 -0700 Committer: Fabian Hueske <[email protected]> Committed: Fri May 19 14:21:19 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/table/api/exceptions.scala | 48 +++---- .../table/catalog/CrudExternalCatalog.scala | 78 +++++----- .../flink/table/catalog/ExternalCatalog.scala | 38 ++--- .../table/catalog/ExternalCatalogDatabase.scala | 31 ---- .../table/catalog/ExternalCatalogSchema.scala | 91 +++++------- .../table/catalog/ExternalCatalogTable.scala | 16 --- .../table/catalog/InMemoryExternalCatalog.scala | 142 +++++++------------ .../flink/table/ExternalCatalogTest.scala | 33 +++++ .../catalog/InMemoryExternalCatalogTest.scala | 103 +++++++------- .../flink/table/utils/CommonTestData.scala | 17 ++- 10 files changed, 259 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 760cf75..7ea17fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends RuntimeException(msg) /** * Exception for an operation on a nonexistent table * - * @param db database name - * @param table table name - * @param cause the cause + * @param catalog catalog name + * @param table table name + * @param cause the cause */ case class TableNotExistException( - db: String, + catalog: String, table: String, cause: Throwable) - extends RuntimeException(s"Table $db.$table does not exist.", cause) { + extends RuntimeException(s"Table $catalog.$table does not exist.", cause) { - def this(db: String, table: String) = this(db, table, null) + def this(catalog: String, table: String) = this(catalog, table, null) } /** * Exception for adding an already existent table * - * @param db database name - * @param table table name - * @param cause the cause + * @param catalog catalog name + * @param table table name + * @param cause the cause */ case class TableAlreadyExistException( - db: String, + catalog: String, table: String, cause: Throwable) - extends RuntimeException(s"Table $db.$table already exists.", cause) { + extends RuntimeException(s"Table $catalog.$table already exists.", cause) { - def this(db: String, table: String) = this(db, table, null) + def this(catalog: String, table: String) = this(catalog, table, null) } /** - * Exception for operation on a nonexistent database + * Exception for operation on a nonexistent catalog * - * @param db database name + * @param catalog catalog name * @param cause the cause */ -case class DatabaseNotExistException( - db: String, +case class CatalogNotExistException( + catalog: String, cause: Throwable) - extends RuntimeException(s"Database $db does not exist.", cause) { + extends RuntimeException(s"Catalog $catalog does not exist.", cause) { - def this(db: String) = this(db, null) + def this(catalog: String) = this(catalog, null) } /** - * Exception for adding an already existent database + * Exception for adding an already existent catalog * - * @param db database name + * @param catalog catalog name * @param cause the cause */ -case class DatabaseAlreadyExistException( - db: String, +case class CatalogAlreadyExistException( + catalog: String, cause: Throwable) - extends RuntimeException(s"Database $db already exists.", cause) { + extends RuntimeException(s"Catalog $catalog already exists.", cause) { - def this(db: String) = this(db, null) + def this(catalog: String) = this(catalog, null) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala index fcefa45..4db9497 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala @@ -21,88 +21,86 @@ package org.apache.flink.table.catalog import org.apache.flink.table.api._ /** - * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables. + * The CrudExternalCatalog provides methods to create, drop, and alter (sub-)catalogs or tables. */ trait CrudExternalCatalog extends ExternalCatalog { /** - * Adds a table to the catalog. + * Adds a table to this catalog. * - * @param table Description of the table to add + * @param tableName The name of the table to add. + * @param table The table to add. * @param ignoreIfExists Flag to specify behavior if a table with the given name already exists: * if set to false, it throws a TableAlreadyExistException, * if set to true, nothing happens. - * @throws DatabaseNotExistException thrown if database does not exist * @throws TableAlreadyExistException thrown if table already exists and ignoreIfExists is false */ - @throws[DatabaseNotExistException] @throws[TableAlreadyExistException] - def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit + def createTable(tableName: String, table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit /** - * Deletes table from a database of the catalog. + * Deletes table from this catalog. * - * @param dbName Name of the database - * @param tableName Name of the table - * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist: + * @param tableName Name of the table to delete. + * @param ignoreIfNotExists Flag to specify behavior if the table does not exist: * if set to false, throw an exception, * if set to true, nothing happens. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog * @throws TableNotExistException thrown if the table does not exist in the catalog */ - @throws[DatabaseNotExistException] @throws[TableNotExistException] - def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit + def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit /** - * Modifies an existing table in the catalog. + * Modifies an existing table of this catalog. * - * @param table New description of the table to update - * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist: + * @param tableName The name of the table to modify. + * @param table The new table which replaces the existing table. + * @param ignoreIfNotExists Flag to specify behavior if the table does not exist: * if set to false, throw an exception, * if set to true, nothing happens. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog - * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotExistException thrown if the table does not exist in the catalog */ - @throws[DatabaseNotExistException] @throws[TableNotExistException] - def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit + def alterTable(tableName: String, table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit /** - * Adds a database to the catalog. + * Adds a subcatalog to this catalog. * - * @param db Description of the database to create - * @param ignoreIfExists Flag to specify behavior if a database with the given name already - * exists: if set to false, it throws a DatabaseAlreadyExistException, + * @param name The name of the sub catalog to add. + * @param catalog Description of the catalog to add. + * @param ignoreIfExists Flag to specify behavior if a sub catalog with the given name already + * exists: if set to false, it throws a CatalogAlreadyExistException, * if set to true, nothing happens. - * @throws DatabaseAlreadyExistException thrown if the database does already exist in the catalog - * and ignoreIfExists is false + * @throws CatalogAlreadyExistException + * thrown if the sub catalog does already exist in the catalog + * and ignoreIfExists is false */ - @throws[DatabaseAlreadyExistException] - def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit + @throws[CatalogAlreadyExistException] + def createSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfExists: Boolean): Unit /** - * Deletes a database from the catalog. + * Deletes a sub catalog from this catalog. * - * @param dbName Name of the database. - * @param ignoreIfNotExists Flag to specify behavior if the database does not exist: + * @param name Name of the sub catalog to delete. + * @param ignoreIfNotExists Flag to specify behavior if the catalog does not exist: * if set to false, throw an exception, * if set to true, nothing happens. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog + * @throws CatalogNotExistException thrown if the sub catalog does not exist in the catalog */ - @throws[DatabaseNotExistException] - def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit + @throws[CatalogNotExistException] + def dropSubCatalog(name: String, ignoreIfNotExists: Boolean): Unit /** - * Modifies an existing database in the catalog. + * Modifies an existing sub catalog of this catalog. * - * @param db New description of the database to update - * @param ignoreIfNotExists Flag to specify behavior if the database does not exist: + * @param name Name of the catalog to modify. + * @param catalog The new sub catalog to replace the existing sub catalog. + * @param ignoreIfNotExists Flag to specify behavior if the sub catalog does not exist: * if set to false, throw an exception, * if set to true, nothing happens. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog + * @throws CatalogNotExistException thrown if the sub catalog does not exist in the catalog */ - @throws[DatabaseNotExistException] - def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit + @throws[CatalogNotExistException] + def alterSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfNotExists: Boolean): Unit } http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala index 00a35e4..5f4511b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala @@ -26,49 +26,41 @@ import org.apache.flink.table.api._ * An [[ExternalCatalog]] is the connector between an external database catalog and Flink's * Table API. * - * It provides information about databases and tables such as names, schema, statistics, and - * access information. + * It provides information about catalogs, databases and tables such as names, schema, statistics, + * and access information. */ trait ExternalCatalog { /** - * Get a table from the catalog + * Get a table from this catalog. * - * @param dbName The name of the table's database. * @param tableName The name of the table. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog. * @throws TableNotExistException thrown if the table does not exist in the catalog. - * @return the requested table + * @return The requested table. */ - @throws[DatabaseNotExistException] @throws[TableNotExistException] - def getTable(dbName: String, tableName: String): ExternalCatalogTable + def getTable(tableName: String): ExternalCatalogTable /** - * Get a list of all table names of a database in the catalog. + * Gets the names of all tables registered in this catalog. * - * @param dbName The name of the database. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog - * @return The list of table names + * @return A list of the names of all registered tables. */ - @throws[DatabaseNotExistException] - def listTables(dbName: String): JList[String] + def listTables(): JList[String] /** - * Gets a database from the catalog. + * Gets a sub catalog from this catalog. * - * @param dbName The name of the database. - * @throws DatabaseNotExistException thrown if the database does not exist in the catalog - * @return The requested database + * @return The requested sub catalog. */ - @throws[DatabaseNotExistException] - def getDatabase(dbName: String): ExternalCatalogDatabase + @throws[CatalogNotExistException] + def getSubCatalog(dbName: String): ExternalCatalog /** - * Gets a list of all databases in the catalog. + * Gets the names of all sub catalogs registered in this catalog. * - * @return The list of database names + * @return The list of the names of all registered sub catalogs. */ - def listDatabases(): JList[String] + def listSubCatalogs(): JList[String] } http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala deleted file mode 100644 index 99ab2eb..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog - -import java.util.{HashMap => JHashMap, Map => JMap} - -/** - * Defines a database in an [[ExternalCatalog]]. - * - * @param dbName The name of the database - * @param properties The properties of the database - */ -case class ExternalCatalogDatabase( - dbName: String, - properties: JMap[String, String] = new JHashMap()) http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala index 8e010fa..ad96e77 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala @@ -22,18 +22,16 @@ import java.util.{Collections => JCollections, Collection => JCollection, Linked import org.apache.calcite.linq4j.tree.Expression import org.apache.calcite.schema._ -import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException} +import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ /** - * This class is responsible for connect external catalog to calcite catalog. - * In this way, it is possible to look-up and access tables in SQL queries - * without registering tables in advance. - * The databases in the external catalog registers as calcite sub-Schemas of current schema. - * The tables in a given database registers as calcite tables - * of the [[ExternalCatalogDatabaseSchema]]. + * This class is responsible to connect an external catalog to Calcite's catalog. + * This enables to look-up and access tables in SQL queries without registering tables in advance. + * The the external catalog and all included sub-catalogs and tables is registered as + * sub-schemas and tables in Calcite. * * @param catalogIdentifier external catalog name * @param catalog external catalog @@ -45,32 +43,47 @@ class ExternalCatalogSchema( private val LOG: Logger = LoggerFactory.getLogger(this.getClass) /** - * Looks up database by the given sub-schema name in the external catalog, - * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name. + * Looks up a sub-schema by the given sub-schema name in the external catalog. + * Returns it wrapped in a [[ExternalCatalogSchema]] with the given database name. * - * @param name Sub-schema name - * @return Sub-schema with a given name, or null + * @param name Name of sub-schema to look up. + * @return Sub-schema with a given name, or null. */ override def getSubSchema(name: String): Schema = { try { - val db = catalog.getDatabase(name) - new ExternalCatalogDatabaseSchema(db.dbName, catalog) + val db = catalog.getSubCatalog(name) + new ExternalCatalogSchema(name, db) } catch { - case e: DatabaseNotExistException => - LOG.warn(s"Database $name does not exist in externalCatalog $catalogIdentifier") + case _: CatalogNotExistException => + LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier") null } } /** - * Lists the databases of the external catalog, - * returns the lists as the names of this schema's sub-schemas. + * Lists the sub-schemas of the external catalog. + * Returns a list of names of this schema's sub-schemas. * * @return names of this schema's child schemas */ - override def getSubSchemaNames: JSet[String] = new JLinkedHashSet(catalog.listDatabases()) + override def getSubSchemaNames: JSet[String] = new JLinkedHashSet(catalog.listSubCatalogs()) - override def getTable(name: String): Table = null + /** + * Looks up and returns a table from this schema. + * Returns null if no table is found for the given name. + * + * @param name The name of the table to look up. + * @return The table or null if no table is found. + */ + override def getTable(name: String): Table = try { + val externalCatalogTable = catalog.getTable(name) + ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable) + } catch { + case TableNotExistException(table, _, _) => { + LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier") + null + } + } override def isMutable: Boolean = true @@ -91,46 +104,8 @@ class ExternalCatalogSchema( * @param plusOfThis */ def registerSubSchemas(plusOfThis: SchemaPlus) { - catalog.listDatabases().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db))) - } - - private class ExternalCatalogDatabaseSchema( - schemaName: String, - flinkExternalCatalog: ExternalCatalog) extends Schema { - - override def getTable(name: String): Table = { - try { - val externalCatalogTable = flinkExternalCatalog.getTable(schemaName, name) - ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable) - } catch { - case TableNotExistException(db, table, cause) => { - LOG.warn(s"Table $db.$table does not exist in externalCatalog $catalogIdentifier") - null - } - } - } - - override def getTableNames: JSet[String] = - new JLinkedHashSet(flinkExternalCatalog.listTables(schemaName)) - - override def getSubSchema(name: String): Schema = null - - override def getSubSchemaNames: JSet[String] = JCollections.emptySet[String] - - override def isMutable: Boolean = true - - override def getFunctions(name: String): JCollection[Function] = - JCollections.emptyList[Function] - - override def getExpression(parentSchema: SchemaPlus, name: String): Expression = - Schemas.subSchemaExpression(parentSchema, name, getClass) - - override def getFunctionNames: JSet[String] = JCollections.emptySet[String] - - override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = true - + catalog.listSubCatalogs().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db))) } - } object ExternalCatalogSchema { http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index 01eca6d..ae20718 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -27,7 +27,6 @@ import org.apache.flink.table.plan.stats.TableStats /** * Defines a table in an [[ExternalCatalog]]. * - * @param identifier Identifier of the table (database name and table name) * @param tableType Table type, e.g csv, hbase, kafka * @param schema Schema of the table (column names and types) * @param properties Properties of the table @@ -37,7 +36,6 @@ import org.apache.flink.table.plan.stats.TableStats * @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( - identifier: TableIdentifier, tableType: String, schema: TableSchema, properties: JMap[String, String] = new JHashMap(), @@ -45,17 +43,3 @@ case class ExternalCatalogTable( comment: String = null, createTime: JLong = System.currentTimeMillis, lastAccessTime: JLong = -1L) - -/** - * Identifier for a catalog table. - * - * @param database Database name - * @param table Table name - */ -case class TableIdentifier( - database: String, - table: String) { - - override def toString: String = s"$database.$table" - -} http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala index 6a61916..ee30a8e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala @@ -18,138 +18,106 @@ package org.apache.flink.table.catalog -import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} import java.util.{List => JList} -import scala.collection.mutable.HashMap +import org.apache.flink.table.api.{CatalogAlreadyExistException, CatalogNotExistException, TableAlreadyExistException, TableNotExistException} + +import scala.collection.mutable import scala.collection.JavaConverters._ /** * This class is an in-memory implementation of [[ExternalCatalog]]. * + * @param name The name of the catalog + * * It could be used for testing or developing instead of used in production environment. */ -class InMemoryExternalCatalog extends CrudExternalCatalog { +class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog { - private val databases = new HashMap[String, Database] + private val databases = new mutable.HashMap[String, ExternalCatalog] + private val tables = new mutable.HashMap[String, ExternalCatalogTable] - @throws[DatabaseNotExistException] @throws[TableAlreadyExistException] override def createTable( - table: ExternalCatalogTable, - ignoreIfExists: Boolean): Unit = synchronized { - val dbName = table.identifier.database - val tables = getTables(dbName) - val tableName = table.identifier.table - if (tables.contains(tableName)) { - if (!ignoreIfExists) { - throw new TableAlreadyExistException(dbName, tableName) - } - } else { - tables.put(tableName, table) + tableName: String, + table: ExternalCatalogTable, + ignoreIfExists: Boolean): Unit = synchronized { + tables.get(tableName) match { + case Some(_) if !ignoreIfExists => throw new TableAlreadyExistException(name, tableName) + case _ => tables.put(tableName, table) } } - @throws[DatabaseNotExistException] @throws[TableNotExistException] - override def dropTable( - dbName: String, - tableName: String, - ignoreIfNotExists: Boolean): Unit = synchronized { - val tables = getTables(dbName) + override def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit = synchronized { if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) { - throw new TableNotExistException(dbName, tableName) + throw new TableNotExistException(name, tableName) } } - @throws[DatabaseNotExistException] @throws[TableNotExistException] override def alterTable( - table: ExternalCatalogTable, - ignoreIfNotExists: Boolean): Unit = synchronized { - val dbName = table.identifier.database - val tables = getTables(dbName) - val tableName = table.identifier.table + tableName: String, + table: ExternalCatalogTable, + ignoreIfNotExists: Boolean): Unit = synchronized { if (tables.contains(tableName)) { tables.put(tableName, table) } else if (!ignoreIfNotExists) { - throw new TableNotExistException(dbName, tableName) + throw new TableNotExistException(name, tableName) } } - @throws[DatabaseNotExistException] - override def listTables(dbName: String): JList[String] = synchronized { - val tables = getTables(dbName) - tables.keys.toList.asJava - } - - @throws[DatabaseNotExistException] - @throws[TableNotExistException] - override def getTable(dbName: String, tableName: String): ExternalCatalogTable = synchronized { - val tables = getTables(dbName) - tables.get(tableName) match { - case Some(table) => table - case None => throw new TableNotExistException(dbName, tableName) + @throws[CatalogAlreadyExistException] + override def createSubCatalog( + catalogName: String, + catalog: ExternalCatalog, + ignoreIfExists: Boolean): Unit = synchronized { + databases.get(catalogName) match { + case Some(_) if !ignoreIfExists => throw CatalogAlreadyExistException(catalogName, null) + case _ => databases.put(catalogName, catalog) } } - @throws[DatabaseAlreadyExistException] - override def createDatabase( - db: ExternalCatalogDatabase, - ignoreIfExists: Boolean): Unit = synchronized { - val dbName = db.dbName - if (databases.contains(dbName)) { - if (!ignoreIfExists) { - throw new DatabaseAlreadyExistException(dbName) - } - } else { - databases.put(dbName, new Database(db)) + @throws[CatalogNotExistException] + override def dropSubCatalog( + catalogName: String, + ignoreIfNotExists: Boolean): Unit = synchronized { + if (databases.remove(catalogName).isEmpty && !ignoreIfNotExists) { + throw CatalogNotExistException(catalogName, null) } } - @throws[DatabaseNotExistException] - override def alterDatabase( - db: ExternalCatalogDatabase, - ignoreIfNotExists: Boolean): Unit = synchronized { - val dbName = db.dbName - databases.get(dbName) match { - case Some(database) => database.db = db - case None => - if (!ignoreIfNotExists) { - throw new DatabaseNotExistException(dbName) - } + override def alterSubCatalog( + catalogName: String, + catalog: ExternalCatalog, + ignoreIfNotExists: Boolean): Unit = synchronized { + if (databases.contains(catalogName)) { + databases.put(catalogName, catalog) + } else if (!ignoreIfNotExists) { + throw new CatalogNotExistException(catalogName) } } - @throws[DatabaseNotExistException] - override def dropDatabase( - dbName: String, - ignoreIfNotExists: Boolean): Unit = synchronized { - if (databases.remove(dbName).isEmpty && !ignoreIfNotExists) { - throw new DatabaseNotExistException(dbName) + override def getTable(tableName: String): ExternalCatalogTable = synchronized { + tables.get(tableName) match { + case Some(t) => t + case _ => throw TableNotExistException(name, tableName, null) } } - override def listDatabases(): JList[String] = synchronized { - databases.keys.toList.asJava + override def listTables(): JList[String] = synchronized { + tables.keys.toList.asJava } - @throws[DatabaseNotExistException] - override def getDatabase(dbName: String): ExternalCatalogDatabase = synchronized { - databases.get(dbName) match { - case Some(database) => database.db - case None => throw new DatabaseNotExistException(dbName) + @throws[CatalogNotExistException] + override def getSubCatalog(catalogName: String): ExternalCatalog = synchronized { + databases.get(catalogName) match { + case Some(d) => d + case _ => throw CatalogNotExistException(catalogName, null) } } - private def getTables(db: String): HashMap[String, ExternalCatalogTable] = - databases.get(db) match { - case Some(database) => database.tables - case None => throw new DatabaseNotExistException(db) - } - - private class Database(var db: ExternalCatalogDatabase) { - val tables = new HashMap[String, ExternalCatalogTable] + override def listSubCatalogs(): JList[String] = synchronized { + databases.keys.toList.asJava } - } http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala index d801644..27dd8d8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala @@ -28,6 +28,7 @@ import org.junit.Test */ class ExternalCatalogTest extends TableTestBase { private val table1Path: Array[String] = Array("test", "db1", "tb1") + private val table1TopLevelPath: Array[String] = Array("test", "tb1") private val table1ProjectedFields: Array[String] = Array("a", "b", "c") private val table2Path: Array[String] = Array("test", "db2", "tb2") private val table2ProjectedFields: Array[String] = Array("d", "e", "g") @@ -148,6 +149,38 @@ class ExternalCatalogTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + + @Test + def testTopLevelTable(): Unit = { + val util = batchTestUtil() + val tEnv = util.tEnv + + tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + + val table1 = tEnv.scan("test", "tb1") + val table2 = tEnv.scan("test", "db2", "tb2") + val result = table2 + .select('d * 2, 'e, 'g.upperCase()) + .unionAll(table1.select('a * 2, 'b, 'c.upperCase())) + + val expected = binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + sourceBatchTableNode(table2Path, table2ProjectedFields), + term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2") + ), + unaryNode( + "DataSetCalc", + sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields), + term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2") + ), + term("union", "_c0", "e", "_c2") + ) + + util.verifyTable(result, expected) + } + def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = { s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " + s"fields=[${fields.mkString(", ")}])" http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala index 5402780..6d1d66f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala @@ -31,15 +31,14 @@ class InMemoryExternalCatalogTest { @Before def setUp(): Unit = { - catalog = new InMemoryExternalCatalog() - catalog.createDatabase(ExternalCatalogDatabase(databaseName), ignoreIfExists = false) + catalog = new InMemoryExternalCatalog(databaseName) } @Test def testCreateTable(): Unit = { - assertTrue(catalog.listTables(databaseName).isEmpty) - catalog.createTable(createTableInstance(databaseName, "t1"), ignoreIfExists = false) - val tables = catalog.listTables(databaseName) + assertTrue(catalog.listTables().isEmpty) + catalog.createTable("t1", createTableInstance(), ignoreIfExists = false) + val tables = catalog.listTables() assertEquals(1, tables.size()) assertEquals("t1", tables.get(0)) } @@ -47,36 +46,31 @@ class InMemoryExternalCatalogTest { @Test(expected = classOf[TableAlreadyExistException]) def testCreateExistedTable(): Unit = { val tableName = "t1" - catalog.createTable(createTableInstance(databaseName, tableName), false) - catalog.createTable(createTableInstance(databaseName, tableName), false) + catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false) + catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false) } @Test def testGetTable(): Unit = { - val originTable = createTableInstance(databaseName, "t1") - catalog.createTable(originTable, false) - assertEquals(catalog.getTable(databaseName, "t1"), originTable) - } - - @Test(expected = classOf[DatabaseNotExistException]) - def testGetTableUnderNotExistDatabaseName(): Unit = { - catalog.getTable("notexistedDb", "t1") + val originTable = createTableInstance() + catalog.createTable("t1", originTable, ignoreIfExists = false) + assertEquals(catalog.getTable("t1"), originTable) } @Test(expected = classOf[TableNotExistException]) def testGetNotExistTable(): Unit = { - catalog.getTable(databaseName, "t1") + catalog.getTable("nonexisted") } @Test def testAlterTable(): Unit = { val tableName = "t1" - val table = createTableInstance(databaseName, tableName) - catalog.createTable(table, false) - assertEquals(catalog.getTable(databaseName, tableName), table) - val newTable = createTableInstance(databaseName, tableName) - catalog.alterTable(newTable, false) - val currentTable = catalog.getTable(databaseName, tableName) + val table = createTableInstance() + catalog.createTable(tableName, table, ignoreIfExists = false) + assertEquals(catalog.getTable(tableName), table) + val newTable = createTableInstance() + catalog.alterTable(tableName, newTable, ignoreIfNotExists = false) + val currentTable = catalog.getTable(tableName) // validate the table is really replaced after alter table assertNotEquals(table, currentTable) assertEquals(newTable, currentTable) @@ -84,53 +78,61 @@ class InMemoryExternalCatalogTest { @Test(expected = classOf[TableNotExistException]) def testAlterNotExistTable(): Unit = { - catalog.alterTable(createTableInstance(databaseName, "t1"), false) + catalog.alterTable("nonexisted", createTableInstance(), ignoreIfNotExists = false) } @Test def testDropTable(): Unit = { val tableName = "t1" - catalog.createTable(createTableInstance(databaseName, tableName), false) - assertTrue(catalog.listTables(databaseName).contains(tableName)) - catalog.dropTable(databaseName, tableName, false) - assertFalse(catalog.listTables(databaseName).contains(tableName)) + catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false) + assertTrue(catalog.listTables().contains(tableName)) + catalog.dropTable(tableName, ignoreIfNotExists = false) + assertFalse(catalog.listTables().contains(tableName)) } @Test(expected = classOf[TableNotExistException]) def testDropNotExistTable(): Unit = { - catalog.dropTable(databaseName, "t1", false) - } - - @Test - def testListDatabases(): Unit = { - val databases = catalog.listDatabases() - assertEquals(1, databases.size()) - assertEquals(databaseName, databases.get(0)) - } - - @Test - def testGetDatabase(): Unit = { - assertNotNull(catalog.getDatabase(databaseName)) + catalog.dropTable("nonexisted", ignoreIfNotExists = false) } - @Test(expected = classOf[DatabaseNotExistException]) + @Test(expected = classOf[CatalogNotExistException]) def testGetNotExistDatabase(): Unit = { - catalog.getDatabase("notexistedDb") + catalog.getSubCatalog("notexistedDb") } @Test def testCreateDatabase(): Unit = { - val originDatabasesNum = catalog.listDatabases().size - catalog.createDatabase(ExternalCatalogDatabase("db2"), false) - assertEquals(catalog.listDatabases().size, originDatabasesNum + 1) + catalog.createSubCatalog("db2", new InMemoryExternalCatalog("db2"), ignoreIfExists = false) + assertEquals(1, catalog.listSubCatalogs().size) } - @Test(expected = classOf[DatabaseAlreadyExistException]) + @Test(expected = classOf[CatalogAlreadyExistException]) def testCreateExistedDatabase(): Unit = { - catalog.createDatabase(ExternalCatalogDatabase(databaseName), false) + catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"), + ignoreIfExists = false) + + assertNotNull(catalog.getSubCatalog("existed")) + val databases = catalog.listSubCatalogs() + assertEquals(1, databases.size()) + assertEquals("existed", databases.get(0)) + + catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"), + ignoreIfExists = false) + } + + @Test + def testNestedCatalog(): Unit = { + val sub = new InMemoryExternalCatalog("sub") + val sub1 = new InMemoryExternalCatalog("sub1") + catalog.createSubCatalog("sub", sub, ignoreIfExists = false) + sub.createSubCatalog("sub1", sub1, ignoreIfExists = false) + sub1.createTable("table", createTableInstance(), ignoreIfExists = false) + val tables = catalog.getSubCatalog("sub").getSubCatalog("sub1").listTables() + assertEquals(1, tables.size()) + assertEquals("table", tables.get(0)) } - private def createTableInstance(dbName: String, tableName: String): ExternalCatalogTable = { + private def createTableInstance(): ExternalCatalogTable = { val schema = new TableSchema( Array("first", "second"), Array( @@ -138,9 +140,6 @@ class InMemoryExternalCatalogTest { BasicTypeInfo.INT_TYPE_INFO ) ) - ExternalCatalogTable( - TableIdentifier(dbName, tableName), - "csv", - schema) + ExternalCatalogTable("csv", schema) } } http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala index a1bfd56..6a5c52f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala @@ -73,7 +73,6 @@ object CommonTestData { properties1.put("fieldDelim", "#") properties1.put("rowDelim", "$") val externalCatalogTable1 = ExternalCatalogTable( - TableIdentifier("db1", "tb1"), "csv", new TableSchema( Array("a", "b", "c"), @@ -107,7 +106,6 @@ object CommonTestData { properties2.put("fieldDelim", "#") properties2.put("rowDelim", "$") val externalCatalogTable2 = ExternalCatalogTable( - TableIdentifier("db2", "tb2"), "csv", new TableSchema( Array("d", "e", "f", "g", "h"), @@ -120,11 +118,16 @@ object CommonTestData { ), properties2 ) - val catalog = new InMemoryExternalCatalog - catalog.createDatabase(ExternalCatalogDatabase("db1"), false) - catalog.createDatabase(ExternalCatalogDatabase("db2"), false) - catalog.createTable(externalCatalogTable1, false) - catalog.createTable(externalCatalogTable2, false) + val catalog = new InMemoryExternalCatalog("test") + val db1 = new InMemoryExternalCatalog("db1") + val db2 = new InMemoryExternalCatalog("db2") + catalog.createSubCatalog("db1", db1, ignoreIfExists = false) + catalog.createSubCatalog("db2", db2, ignoreIfExists = false) + + // Register the table with both catalogs + catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false) + db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false) + db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false) catalog }
