[
https://issues.apache.org/jira/browse/FLINK-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675731#comment-16675731
]
ASF GitHub Bot commented on FLINK-10697:
----------------------------------------
bowenli86 closed pull request #6997: [FLINK-10697][Table & SQL] Create an
in-memory catalog that stores Flink's meta objects
URL: https://github.com/apache/flink/pull/6997
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/catalog/FlinkInMemoryCatalog.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/catalog/FlinkInMemoryCatalog.java
new file mode 100644
index 00000000000..82c08d2691b
--- /dev/null
+++
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/catalog/FlinkInMemoryCatalog.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.FunctionAlreadyExistException;
+import org.apache.flink.table.api.FunctionNotExistException;
+import org.apache.flink.table.api.TableAlreadyExistException;
+import org.apache.flink.table.api.TableNotExistException;
+import org.apache.flink.table.api.ViewAlreadyExistException;
+import org.apache.flink.table.api.ViewNotExistException;
+import org.apache.flink.table.functions.UserDefinedFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An in-memory production implementation of {@link ExternalCatalog}.
+ */
+public class FlinkInMemoryCatalog implements CrudExternalCatalog {
+
+ final String catalogName;
+
+ protected final Map<String, ExternalCatalogTable> tables = new
ConcurrentHashMap<>();
+ protected final Map<String, String> views = new ConcurrentHashMap<>();
+ protected final Map<String, UserDefinedFunction> functions = new
ConcurrentHashMap<>();
+
+ public FlinkInMemoryCatalog(String catalogName) {
+ this.catalogName = catalogName;
+ }
+
+ @Override
+ public void createTable(String tableName, ExternalCatalogTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException {
+ if (tables.containsKey(tableName)) {
+ if (!ignoreIfExists) {
+ throw new
TableAlreadyExistException(catalogName, tableName);
+ }
+ } else {
+ tables.put(tableName, table);
+ }
+ }
+
+ @Override
+ public void dropTable(String tableName, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ if (tables.remove(tableName) == null && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tableName);
+ }
+ }
+
+ @Override
+ public void alterTable(String tableName, ExternalCatalogTable table,
boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ if (tables.containsKey(tableName)) {
+ tables.put(tableName, table);
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tableName);
+ }
+ }
+
+ @Override
+ public void createView(String viewName, String view, boolean
ignoreIfExists)
+ throws ViewAlreadyExistException {
+ if (views.containsKey(viewName)) {
+ if (!ignoreIfExists) {
+ throw new
ViewAlreadyExistException(catalogName, viewName);
+ }
+ } else {
+ views.put(viewName, view);
+ }
+ }
+
+ @Override
+ public void dropView(String viewName, boolean ignoreIfNotExists)
+ throws ViewNotExistException {
+ if (views.remove(viewName) == null && !ignoreIfNotExists) {
+ throw new ViewNotExistException(catalogName, viewName);
+ }
+ }
+
+ @Override
+ public void alterView(String viewName, String view, boolean
ignoreIfNotExists)
+ throws ViewNotExistException {
+ if (views.containsKey(viewName)) {
+ views.put(viewName, view);
+ } else if (!ignoreIfNotExists) {
+ throw new ViewNotExistException(catalogName, viewName);
+ }
+ }
+
+ @Override
+ public void createFunction(String functionName, UserDefinedFunction
function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException {
+ if (functions.containsKey(functionName)) {
+ if (!ignoreIfExists) {
+ throw new
FunctionAlreadyExistException(catalogName, functionName);
+ }
+ } else {
+ functions.put(functionName, function);
+ }
+ }
+
+ @Override
+ public void dropFunction(String functionName, boolean ignoreIfNotExists)
+ throws FunctionNotExistException {
+ if (functions.remove(functionName) == null &&
!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName,
functionName);
+ }
+ }
+
+ @Override
+ public void alterFunction(String functionName, UserDefinedFunction
function, boolean ignoreIfNotExists)
+ throws FunctionNotExistException {
+ if (functions.containsKey(functionName)) {
+ functions.put(functionName, function);
+ } else if (!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName,
functionName);
+ }
+ }
+
+ @Override
+ public ExternalCatalogTable getTable(String tableName) throws
TableNotExistException {
+ ExternalCatalogTable result = tables.get(tableName);
+
+ if (result == null) {
+ throw new TableNotExistException(catalogName,
tableName, null);
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public List<String> listTables() {
+ return new ArrayList<>(tables.keySet());
+ }
+
+ @Override
+ public String getView(String viewName) throws ViewNotExistException {
+ String result = views.get(viewName);
+
+ if (result == null) {
+ throw new ViewNotExistException(catalogName, viewName,
null);
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public List<String> listViews() {
+ return new ArrayList<>(views.keySet());
+ }
+
+ @Override
+ public UserDefinedFunction getFunction(String functionName) throws
FunctionNotExistException {
+ UserDefinedFunction result = functions.get(functionName);
+
+ if (result == null) {
+ throw new FunctionNotExistException(catalogName,
functionName, null);
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public List<String> listFunctions() {
+ return new ArrayList<>(functions.keySet());
+ }
+
+ // ------ FlinkInMemoryCatalog doesn't support sub-catalog -------
+
+ @Override
+ public void createSubCatalog(String name, ExternalCatalog catalog,
boolean ignoreIfExists)
+ throws CatalogAlreadyExistException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropSubCatalog(String name, boolean ignoreIfNotExists)
+ throws CatalogNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterSubCatalog(String name, ExternalCatalog catalog,
boolean ignoreIfNotExists)
+ throws CatalogNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ExternalCatalog getSubCatalog(String dbName) throws
CatalogNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listSubCatalogs() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/catalog/TestingInMemoryCatalog.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/catalog/TestingInMemoryCatalog.java
new file mode 100644
index 00000000000..0ab7d666c7f
--- /dev/null
+++
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/catalog/TestingInMemoryCatalog.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistException;
+import org.apache.flink.table.api.CatalogNotExistException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An in-memory testing implementation of {@link ExternalCatalog}.
+ * It could be used for testing or developing instead of used in production
environment.
+ */
+public class TestingInMemoryCatalog extends FlinkInMemoryCatalog {
+
+ private final Map<String, ExternalCatalog> databases = new
ConcurrentHashMap<>();
+
+ public TestingInMemoryCatalog(String catalogName) {
+ super(catalogName);
+ }
+
+ @Override
+ public void createSubCatalog(String name, ExternalCatalog catalog,
boolean ignoreIfExists)
+ throws CatalogAlreadyExistException {
+ if (databases.containsKey(name)) {
+ if (!ignoreIfExists) {
+ throw new CatalogAlreadyExistException(name);
+ }
+ } else {
+ databases.put(name, catalog);
+ }
+ }
+
+ @Override
+ public void dropSubCatalog(String name, boolean ignoreIfNotExists)
+ throws CatalogNotExistException {
+ if (databases.remove(name) == null && !ignoreIfNotExists) {
+ throw new CatalogNotExistException(name);
+ }
+ }
+
+ @Override
+ public void alterSubCatalog(String name, ExternalCatalog catalog,
boolean ignoreIfNotExists)
+ throws CatalogNotExistException {
+ if (databases.containsKey(name)) {
+ databases.put(name, catalog);
+ } else if (!ignoreIfNotExists) {
+ throw new CatalogNotExistException(name);
+ }
+ }
+
+ @Override
+ public ExternalCatalog getSubCatalog(String dbName) throws
CatalogNotExistException {
+ ExternalCatalog result = databases.get(dbName);
+
+ if (result == null) {
+ throw new CatalogNotExistException(dbName, null);
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public List<String> listSubCatalogs() {
+ return new ArrayList<>(databases.keySet());
+ }
+}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 7fc7de50e07..0f1c80df544 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -107,6 +107,70 @@ case class CatalogAlreadyExistException(
def this(catalog: String) = this(catalog, null)
}
+/**
+ * Exception for operation on a nonexistent view
+ *
+ * @param catalog catalog name
+ * @param view view name
+ * @param cause the cause
+ */
+case class ViewNotExistException(
+ catalog: String,
+ view: String,
+ cause: Throwable)
+ extends RuntimeException(s"View $view does not exist.", cause) {
+
+ def this(catalog: String, view: String) = this(catalog, view, null)
+}
+
+/**
+ * Exception for adding an already existent view
+ *
+ * @param catalog catalog name
+ * @param view view name
+ * @param cause the cause
+ */
+case class ViewAlreadyExistException(
+ catalog: String,
+ view: String,
+ cause: Throwable)
+ extends RuntimeException(s"View $view already exists.", cause) {
+
+ def this(catalog: String, view: String) = this(catalog, view, null)
+}
+
+/**
+ * Exception for operation on a nonexistent function
+ *
+ * @param catalog catalog name
+ * @param function function name
+ * @param cause the cause
+ */
+case class FunctionNotExistException(
+ catalog: String,
+ function: String,
+ cause: Throwable)
+ extends RuntimeException(s"Function $function does not exist.", cause) {
+
+ def this(catalog: String, function: String) = this(catalog, function, null)
+}
+
+/**
+ * Exception for adding an already existent function
+ *
+ * @param catalog catalog name
+ * @param function function name
+ * @param cause the cause
+ */
+case class FunctionAlreadyExistException(
+ catalog: String,
+ function: String,
+ cause: Throwable)
+ extends RuntimeException(s"Function $function already exists.", cause) {
+
+ def this(catalog: String, function: String) = this(catalog, function, null)
+}
+
/**
* Exception for not finding a [[TableFactory]] for the given properties.
*
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index 4db9497a712..d47cc9386e1 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -19,9 +19,11 @@
package org.apache.flink.table.catalog
import org.apache.flink.table.api._
+import org.apache.flink.table.functions.UserDefinedFunction
/**
- * The CrudExternalCatalog provides methods to create, drop, and alter
(sub-)catalogs or tables.
+ * The CrudExternalCatalog provides methods to create, drop, and alter
(sub-)catalogs, tables,
+ * views and UDFs.
*/
trait CrudExternalCatalog extends ExternalCatalog {
@@ -103,4 +105,86 @@ trait CrudExternalCatalog extends ExternalCatalog {
@throws[CatalogNotExistException]
def alterSubCatalog(name: String, catalog: ExternalCatalog,
ignoreIfNotExists: Boolean): Unit
+ /**
+ * Adds a view to this catalog.
+ *
+ * @param viewName The name of the view to add.
+ * @param view The view to add.
+ * @param ignoreIfExists Flag to specify behavior if a view with the given
name already exists:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws ViewAlreadyExistException thrown if view already exists and
ignoreIfExists is false
+ */
+ @throws[ViewAlreadyExistException]
+ def createView(viewName: String, view: String, ignoreIfExists: Boolean): Unit
+
+ /**
+ * Deletes a view from this catalog.
+ *
+ * @param viewName Name of the view to delete.
+ * @param ignoreIfNotExists Flag to specify behavior if the view does not
exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws ViewNotExistException thrown if the view does not exist in
the catalog
+ */
+ @throws[ViewNotExistException]
+ def dropView(viewName: String, ignoreIfNotExists: Boolean): Unit
+
+ /**
+ * Modifies an existing view of this catalog.
+ *
+ * @param viewName The name of the view to modify.
+ * @param view The new view which replaces the existing view.
+ * @param ignoreIfNotExists Flag to specify behavior if the view does not
exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws ViewNotExistException thrown if the view does not exist in the
catalog
+ */
+ @throws[ViewNotExistException]
+ def alterView(viewName: String, view: String, ignoreIfNotExists: Boolean):
Unit
+
+ /**
+ * Adds a UDF to this catalog.
+ *
+ * @param functionName The name of the function to add.
+ * @param function The function to add.
+ * @param ignoreIfExists Flag to specify behavior if function with the
given name already exists:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws FunctionAlreadyExistException thrown if function already exists
and ignoreIfExists
+ * is false
+ */
+ @throws[FunctionAlreadyExistException]
+ def createFunction(
+ functionName: String,
+ function: UserDefinedFunction,
+ ignoreIfExists: Boolean): Unit
+
+ /**
+ * Deletes a UDF from this catalog.
+ *
+ * @param functionName Name of the function to delete.
+ * @param ignoreIfNotExists Flag to specify behavior if the function does
not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws FunctionNotExistException thrown if the function does not
exist in the catalog
+ */
+ @throws[FunctionNotExistException]
+ def dropFunction(functionName: String, ignoreIfNotExists: Boolean): Unit
+
+ /**
+ * Modifies an existing UDF of this catalog.
+ *
+ * @param functionName The name of the function to modify.
+ * @param function The new function which replaces the existing
function.
+ * @param ignoreIfNotExists Flag to specify behavior if the function does
not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws FunctionNotExistException thrown if the function does not
exist in the catalog
+ */
+ @throws[FunctionNotExistException]
+ def alterFunction(
+ functionName: String,
+ function: UserDefinedFunction,
+ ignoreIfNotExists: Boolean): Unit
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 5f4511b1efd..00567b0778a 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -21,13 +21,14 @@ package org.apache.flink.table.catalog
import java.util.{List => JList}
import org.apache.flink.table.api._
+import org.apache.flink.table.functions.UserDefinedFunction
/**
* An [[ExternalCatalog]] is the connector between an external database
catalog and Flink's
* Table API.
*
- * It provides information about catalogs, databases and tables such as
names, schema, statistics,
- * and access information.
+ * It provides information about catalogs, databases, tables, views, UDFs,
such as names, schema,
+ * statistics, and access information.
*/
trait ExternalCatalog {
@@ -63,4 +64,37 @@ trait ExternalCatalog {
*/
def listSubCatalogs(): JList[String]
+ /**
+ * Gets a view's definition, which is a string such as "select xxx from
yyy", from this catalog.
+ * This might be an oversimplification, but we should be okay for now. In
the future,
+ * we may have a class representation such as ExternalCatalogView.
+ *
+ * @param viewName The view's name
+ * @return The view's definition, which is a string such as "select xxx
from yyy".
+ */
+ @throws[ViewNotExistException]
+ def getView(viewName: String): String
+
+ /**
+ * Gets the names of all views registered in this catalog.
+ *
+ * @return The list of names of all registered views.
+ */
+ def listViews(): JList[String]
+
+ /**
+ * Gets a UDF from this catalog.
+ *
+ * @param functionName The function's name
+ * @return The requested UDF
+ */
+ @throws[FunctionNotExistException]
+ def getFunction(functionName: String): UserDefinedFunction
+
+ /**
+ * Gets the names of all UDFs registered in this catalog.
+ *
+ * @return The list of names of all registered UDFs.
+ */
+ def listFunctions(): JList[String]
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
deleted file mode 100644
index ee30a8ed42a..00000000000
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog
-
-import java.util.{List => JList}
-
-import org.apache.flink.table.api.{CatalogAlreadyExistException,
CatalogNotExistException, TableAlreadyExistException, TableNotExistException}
-
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-/**
- * This class is an in-memory implementation of [[ExternalCatalog]].
- *
- * @param name The name of the catalog
- *
- * It could be used for testing or developing instead of used in production
environment.
- */
-class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
-
- private val databases = new mutable.HashMap[String, ExternalCatalog]
- private val tables = new mutable.HashMap[String, ExternalCatalogTable]
-
- @throws[TableAlreadyExistException]
- override def createTable(
- tableName: String,
- table: ExternalCatalogTable,
- ignoreIfExists: Boolean): Unit = synchronized {
- tables.get(tableName) match {
- case Some(_) if !ignoreIfExists => throw new
TableAlreadyExistException(name, tableName)
- case _ => tables.put(tableName, table)
- }
- }
-
- @throws[TableNotExistException]
- override def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit
= synchronized {
- if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) {
- throw new TableNotExistException(name, tableName)
- }
- }
-
- @throws[TableNotExistException]
- override def alterTable(
- tableName: String,
- table: ExternalCatalogTable,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- if (tables.contains(tableName)) {
- tables.put(tableName, table)
- } else if (!ignoreIfNotExists) {
- throw new TableNotExistException(name, tableName)
- }
- }
-
- @throws[CatalogAlreadyExistException]
- override def createSubCatalog(
- catalogName: String,
- catalog: ExternalCatalog,
- ignoreIfExists: Boolean): Unit = synchronized {
- databases.get(catalogName) match {
- case Some(_) if !ignoreIfExists => throw
CatalogAlreadyExistException(catalogName, null)
- case _ => databases.put(catalogName, catalog)
- }
- }
-
- @throws[CatalogNotExistException]
- override def dropSubCatalog(
- catalogName: String,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- if (databases.remove(catalogName).isEmpty && !ignoreIfNotExists) {
- throw CatalogNotExistException(catalogName, null)
- }
- }
-
- override def alterSubCatalog(
- catalogName: String,
- catalog: ExternalCatalog,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- if (databases.contains(catalogName)) {
- databases.put(catalogName, catalog)
- } else if (!ignoreIfNotExists) {
- throw new CatalogNotExistException(catalogName)
- }
- }
-
- override def getTable(tableName: String): ExternalCatalogTable =
synchronized {
- tables.get(tableName) match {
- case Some(t) => t
- case _ => throw TableNotExistException(name, tableName, null)
- }
- }
-
- override def listTables(): JList[String] = synchronized {
- tables.keys.toList.asJava
- }
-
- @throws[CatalogNotExistException]
- override def getSubCatalog(catalogName: String): ExternalCatalog =
synchronized {
- databases.get(catalogName) match {
- case Some(d) => d
- case _ => throw CatalogNotExistException(catalogName, null)
- }
- }
-
- override def listSubCatalogs(): JList[String] = synchronized {
- databases.keys.toList.asJava
- }
-}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
deleted file mode 100644
index 5238bfe0cb9..00000000000
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.table.api._
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, Schema}
-import org.junit.Assert._
-import org.junit.{Before, Test}
-
-class InMemoryExternalCatalogTest {
-
- private val databaseName = "db1"
-
- private var catalog: InMemoryExternalCatalog = _
-
- @Before
- def setUp(): Unit = {
- catalog = new InMemoryExternalCatalog(databaseName)
- }
-
- @Test
- def testCreateTable(): Unit = {
- assertTrue(catalog.listTables().isEmpty)
- catalog.createTable("t1", createTableInstance(), ignoreIfExists = false)
- val tables = catalog.listTables()
- assertEquals(1, tables.size())
- assertEquals("t1", tables.get(0))
- }
-
- @Test(expected = classOf[TableAlreadyExistException])
- def testCreateExistedTable(): Unit = {
- val tableName = "t1"
- catalog.createTable(tableName, createTableInstance(), ignoreIfExists =
false)
- catalog.createTable(tableName, createTableInstance(), ignoreIfExists =
false)
- }
-
- @Test
- def testGetTable(): Unit = {
- val originTable = createTableInstance()
- catalog.createTable("t1", originTable, ignoreIfExists = false)
- assertEquals(catalog.getTable("t1"), originTable)
- }
-
- @Test(expected = classOf[TableNotExistException])
- def testGetNotExistTable(): Unit = {
- catalog.getTable("nonexisted")
- }
-
- @Test
- def testAlterTable(): Unit = {
- val tableName = "t1"
- val table = createTableInstance()
- catalog.createTable(tableName, table, ignoreIfExists = false)
- assertEquals(catalog.getTable(tableName), table)
- val newTable = createTableInstance(Array("number"), Array(Types.INT))
- catalog.alterTable(tableName, newTable, ignoreIfNotExists = false)
- val currentTable = catalog.getTable(tableName)
- // validate the table is really replaced after alter table
- assertNotEquals(table, currentTable)
- assertEquals(newTable, currentTable)
- }
-
- @Test(expected = classOf[TableNotExistException])
- def testAlterNotExistTable(): Unit = {
- catalog.alterTable("nonexisted", createTableInstance(), ignoreIfNotExists
= false)
- }
-
- @Test
- def testDropTable(): Unit = {
- val tableName = "t1"
- catalog.createTable(tableName, createTableInstance(), ignoreIfExists =
false)
- assertTrue(catalog.listTables().contains(tableName))
- catalog.dropTable(tableName, ignoreIfNotExists = false)
- assertFalse(catalog.listTables().contains(tableName))
- }
-
- @Test(expected = classOf[TableNotExistException])
- def testDropNotExistTable(): Unit = {
- catalog.dropTable("nonexisted", ignoreIfNotExists = false)
- }
-
- @Test(expected = classOf[CatalogNotExistException])
- def testGetNotExistDatabase(): Unit = {
- catalog.getSubCatalog("notexistedDb")
- }
-
- @Test
- def testCreateDatabase(): Unit = {
- catalog.createSubCatalog("db2", new InMemoryExternalCatalog("db2"),
ignoreIfExists = false)
- assertEquals(1, catalog.listSubCatalogs().size)
- }
-
- @Test(expected = classOf[CatalogAlreadyExistException])
- def testCreateExistedDatabase(): Unit = {
- catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"),
- ignoreIfExists = false)
-
- assertNotNull(catalog.getSubCatalog("existed"))
- val databases = catalog.listSubCatalogs()
- assertEquals(1, databases.size())
- assertEquals("existed", databases.get(0))
-
- catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"),
- ignoreIfExists = false)
- }
-
- @Test
- def testNestedCatalog(): Unit = {
- val sub = new InMemoryExternalCatalog("sub")
- val sub1 = new InMemoryExternalCatalog("sub1")
- catalog.createSubCatalog("sub", sub, ignoreIfExists = false)
- sub.createSubCatalog("sub1", sub1, ignoreIfExists = false)
- sub1.createTable("table", createTableInstance(), ignoreIfExists = false)
- val tables =
catalog.getSubCatalog("sub").getSubCatalog("sub1").listTables()
- assertEquals(1, tables.size())
- assertEquals("table", tables.get(0))
- }
-
- private def createTableInstance(): ExternalCatalogTable = {
- val connDesc = new TestConnectorDesc
- val schemaDesc = new Schema()
- .field("first", BasicTypeInfo.STRING_TYPE_INFO)
- .field("second", BasicTypeInfo.INT_TYPE_INFO)
- ExternalCatalogTable.builder(connDesc)
- .withSchema(schemaDesc)
- .asTableSource()
- }
-
- private def createTableInstance(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): ExternalCatalogTable = {
- val connDesc = new TestConnectorDesc
- val schemaDesc = new Schema()
- fieldNames.zipWithIndex.foreach { case (fieldName, index) =>
- schemaDesc.field(fieldName, fieldTypes(index))
- }
- ExternalCatalogTable.builder(connDesc)
- .withSchema(schemaDesc)
- .asTableSource()
- }
-
- class TestConnectorDesc extends ConnectorDescriptor("test", 1, false) {
- override protected def toConnectorProperties: _root_.java.util.Map[String,
String] = {
- _root_.java.util.Collections.emptyMap()
- }
- }
-}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/TestingInMemoryCatalogTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/TestingInMemoryCatalogTest.scala
new file mode 100644
index 00000000000..b40d995e861
--- /dev/null
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/TestingInMemoryCatalogTest.scala
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, Schema}
+import org.apache.flink.table.functions.UserDefinedFunction
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+class TestingInMemoryCatalogTest {
+
+ private val databaseName = "db1"
+
+ private var catalog: TestingInMemoryCatalog = _
+
+ @Before
+ def setUp(): Unit = {
+ catalog = new TestingInMemoryCatalog(databaseName)
+ }
+
+ // ------ Table ------
+
+ @Test
+ def testCreateTable(): Unit = {
+ assertTrue(catalog.listTables().isEmpty)
+ catalog.createTable("t1", createTableInstance(), false)
+ val tables = catalog.listTables()
+ assertEquals(1, tables.size())
+ assertEquals("t1", tables.get(0))
+ }
+
+ @Test(expected = classOf[TableAlreadyExistException])
+ def testCreateExistedTable(): Unit = {
+ val tableName = "t1"
+ catalog.createTable(tableName, createTableInstance(), false)
+ catalog.createTable(tableName, createTableInstance(), false)
+ }
+
+ @Test
+ def testGetTable(): Unit = {
+ val originTable = createTableInstance()
+ catalog.createTable("t1", originTable, false)
+ assertEquals(catalog.getTable("t1"), originTable)
+ }
+
+ @Test(expected = classOf[TableNotExistException])
+ def testGetNotExistTable(): Unit = {
+ catalog.getTable("nonexisted")
+ }
+
+ @Test
+ def testAlterTable(): Unit = {
+ val tableName = "t1"
+ val table = createTableInstance()
+ catalog.createTable(tableName, table, false)
+ assertEquals(catalog.getTable(tableName), table)
+ val newTable = createTableInstance(Array("number"), Array(Types.INT))
+ catalog.alterTable(tableName, newTable, false)
+ val currentTable = catalog.getTable(tableName)
+ // validate the table is really replaced after alter table
+ assertNotEquals(table, currentTable)
+ assertEquals(newTable, currentTable)
+ }
+
+ @Test(expected = classOf[TableNotExistException])
+ def testAlterNotExistTable(): Unit = {
+ catalog.alterTable("nonexisted", createTableInstance(), false)
+ }
+
+ @Test
+ def testDropTable(): Unit = {
+ val tableName = "t1"
+ catalog.createTable(tableName, createTableInstance(), false)
+ assertTrue(catalog.listTables().contains(tableName))
+ catalog.dropTable(tableName, false)
+ assertFalse(catalog.listTables().contains(tableName))
+ }
+
+ @Test(expected = classOf[TableNotExistException])
+ def testDropNotExistTable(): Unit = {
+ catalog.dropTable("nonexisted", false)
+ }
+
+ // ------ SubCatalog ------
+
+ @Test(expected = classOf[CatalogNotExistException])
+ def testGetNotExistDatabase(): Unit = {
+ catalog.getSubCatalog("notexistedDb")
+ }
+
+ @Test
+ def testCreateDatabase(): Unit = {
+ catalog.createSubCatalog("db2", new TestingInMemoryCatalog("db2"), false)
+ assertEquals(1, catalog.listSubCatalogs().size)
+ }
+
+ @Test(expected = classOf[CatalogAlreadyExistException])
+ def testCreateExistedDatabase(): Unit = {
+ catalog.createSubCatalog("existed", new TestingInMemoryCatalog("existed"),
false)
+
+ assertNotNull(catalog.getSubCatalog("existed"))
+ val databases = catalog.listSubCatalogs()
+ assertEquals(1, databases.size())
+ assertEquals("existed", databases.get(0))
+
+ catalog.createSubCatalog("existed", new TestingInMemoryCatalog("existed"),
false)
+ }
+
+ @Test
+ def testNestedCatalog(): Unit = {
+ val sub = new TestingInMemoryCatalog("sub")
+ val sub1 = new TestingInMemoryCatalog("sub1")
+ catalog.createSubCatalog("sub", sub, false)
+ sub.createSubCatalog("sub1", sub1, false)
+ sub1.createTable("table", createTableInstance(), false)
+ val tables =
catalog.getSubCatalog("sub").getSubCatalog("sub1").listTables()
+ assertEquals(1, tables.size())
+ assertEquals("table", tables.get(0))
+ }
+
+ // ------ View ------
+
+ @Test
+ def testCreateView(): Unit = {
+ assertTrue(catalog.listViews().isEmpty)
+ catalog.createView("v1", createViewInstance(), false)
+ val views = catalog.listViews()
+ assertEquals(1, views.size())
+ assertEquals("v1", views.get(0))
+ }
+
+ @Test(expected = classOf[ViewAlreadyExistException])
+ def testCreateExistedView(): Unit = {
+ val viewName = "v1"
+ catalog.createView(viewName, createViewInstance(), false)
+ catalog.createView(viewName, createViewInstance(), false)
+ }
+
+ @Test
+ def testGetView(): Unit = {
+ val originView = createViewInstance()
+ catalog.createView("v1", originView, false)
+ assertEquals(catalog.getView("v1"), originView)
+ }
+
+ @Test(expected = classOf[ViewNotExistException])
+ def testGetNotExistView(): Unit = {
+ catalog.getView("nonexisted")
+ }
+
+ @Test
+ def testAlterView(): Unit = {
+ val viewName = "v1"
+ val view = createViewInstance()
+ catalog.createView(viewName, view, false)
+ assertEquals(catalog.getView(viewName), view)
+
+ val newView = createNewViewInstance()
+ catalog.alterView(viewName, newView, false)
+ val currentView = catalog.getView(viewName)
+ // validate the view is really replaced after alter view
+ assertNotEquals(view, currentView)
+ assertEquals(newView, currentView)
+ }
+
+ @Test(expected = classOf[ViewNotExistException])
+ def testAlterNotExistView(): Unit = {
+ catalog.alterView("nonexisted", createViewInstance(), false)
+ }
+
+ @Test
+ def testDropView(): Unit = {
+ val viewName = "v1"
+ catalog.createView(viewName, createViewInstance(), false)
+ assertTrue(catalog.listViews().contains(viewName))
+ catalog.dropView(viewName, false)
+ assertFalse(catalog.listViews().contains(viewName))
+ }
+
+ @Test(expected = classOf[ViewNotExistException])
+ def testDropNotExistView(): Unit = {
+ catalog.dropView("nonexisted", false)
+ }
+
+ // ------ UDF ------
+
+ @Test
+ def testCreateFunction(): Unit = {
+ assertTrue(catalog.listFunctions().isEmpty)
+ catalog.createFunction("f1", createFunctionInstance(), false)
+ val functions = catalog.listFunctions()
+ assertEquals(1, functions.size())
+ assertEquals("f1", functions.get(0))
+ }
+
+ @Test(expected = classOf[FunctionAlreadyExistException])
+ def testCreateExistedFunction(): Unit = {
+ val functionName = "f1"
+ catalog.createFunction(functionName, createFunctionInstance(), false)
+ catalog.createFunction(functionName, createFunctionInstance(), false)
+ }
+
+ @Test
+ def testGetFunction(): Unit = {
+ val originFunction = createFunctionInstance()
+ catalog.createFunction("f1", originFunction, false)
+ assertEquals(catalog.getFunction("f1"), originFunction)
+ }
+
+ @Test(expected = classOf[FunctionNotExistException])
+ def testGetNotExistFunction(): Unit = {
+ catalog.getFunction("nonexisted")
+ }
+
+ @Test
+ def testAlterFunction(): Unit = {
+ val functionName = "f1"
+ val function = createFunctionInstance()
+ catalog.createFunction(functionName, function, false)
+ assertEquals(catalog.getFunction(functionName), function)
+
+ val newFunction = createNewFunctionInstance()
+ catalog.alterFunction(functionName, newFunction, false)
+ val currentFunction = catalog.getFunction(functionName)
+ // validate the function is really replaced after alter view
+ assertNotEquals(function, currentFunction)
+ assertEquals(newFunction, currentFunction)
+ }
+
+ @Test(expected = classOf[FunctionNotExistException])
+ def testAlterNotExistFunction(): Unit = {
+ catalog.alterFunction("nonexisted", createFunctionInstance(), false)
+ }
+
+ @Test
+ def testDropFunction(): Unit = {
+ val functionName = "f1"
+ catalog.createFunction(functionName, createFunctionInstance(), false)
+ assertTrue(catalog.listFunctions().contains(functionName))
+ catalog.dropFunction(functionName, false)
+ assertFalse(catalog.listFunctions().contains(functionName))
+ }
+
+ @Test(expected = classOf[FunctionNotExistException])
+ def testDropNotExistFunction(): Unit = {
+ catalog.dropFunction("nonexisted", false)
+ }
+
+ // ------ Utils ------
+
+ private def createTableInstance(): ExternalCatalogTable = {
+ val connDesc = new TestConnectorDesc
+ val schemaDesc = new Schema()
+ .field("first", BasicTypeInfo.STRING_TYPE_INFO)
+ .field("second", BasicTypeInfo.INT_TYPE_INFO)
+ ExternalCatalogTable.builder(connDesc)
+ .withSchema(schemaDesc)
+ .asTableSource()
+ }
+
+ private def createTableInstance(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): ExternalCatalogTable = {
+ val connDesc = new TestConnectorDesc
+ val schemaDesc = new Schema()
+ fieldNames.zipWithIndex.foreach { case (fieldName, index) =>
+ schemaDesc.field(fieldName, fieldTypes(index))
+ }
+ ExternalCatalogTable.builder(connDesc)
+ .withSchema(schemaDesc)
+ .asTableSource()
+ }
+
+ private def createViewInstance(): String = {
+ "select a from b"
+ }
+
+ private def createNewViewInstance(): String = {
+ "select c from d"
+ }
+
+ private def createFunctionInstance(): UserDefinedFunction = {
+ new TestFunction1
+ }
+
+ private def createNewFunctionInstance(): UserDefinedFunction = {
+ new TestFunction2
+ }
+
+ class TestConnectorDesc extends ConnectorDescriptor("test", 1, false) {
+ override protected def toConnectorProperties: _root_.java.util.Map[String,
String] = {
+ _root_.java.util.Collections.emptyMap()
+ }
+ }
+
+ class TestFunction1 extends UserDefinedFunction {
+
+ }
+
+ class TestFunction2 extends UserDefinedFunction {
+
+ }
+}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 64fcc8ac7c4..e55da1fa606 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -148,20 +148,20 @@ object CommonTestData {
externalTableBuilder3.inAppendMode()
}
- val catalog = new InMemoryExternalCatalog("test")
- val db1 = new InMemoryExternalCatalog("db1")
- val db2 = new InMemoryExternalCatalog("db2")
- val db3 = new InMemoryExternalCatalog("db3")
- catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
- catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
- catalog.createSubCatalog("db3", db3, ignoreIfExists = false)
+ val catalog = new TestingInMemoryCatalog("test")
+ val db1 = new TestingInMemoryCatalog("db1")
+ val db2 = new TestingInMemoryCatalog("db2")
+ val db3 = new TestingInMemoryCatalog("db3")
+ catalog.createSubCatalog("db1", db1, false)
+ catalog.createSubCatalog("db2", db2, false)
+ catalog.createSubCatalog("db3", db3, false)
// Register the table with both catalogs
- catalog.createTable("tb1", externalTableBuilder1.asTableSource(),
ignoreIfExists = false)
- catalog.createTable("tb3", externalTableBuilder3.asTableSink(),
ignoreIfExists = false)
- db1.createTable("tb1", externalTableBuilder1.asTableSource(),
ignoreIfExists = false)
- db2.createTable("tb2", externalTableBuilder2.asTableSource(),
ignoreIfExists = false)
- db3.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists
= false)
+ catalog.createTable("tb1", externalTableBuilder1.asTableSource(), false)
+ catalog.createTable("tb3", externalTableBuilder3.asTableSink(), false)
+ db1.createTable("tb1", externalTableBuilder1.asTableSource(), false)
+ db2.createTable("tb2", externalTableBuilder2.asTableSource(), false)
+ db3.createTable("tb3", externalTableBuilder3.asTableSink(), false)
catalog
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Create an in-memory catalog that stores Flink's meta objects
> ------------------------------------------------------------
>
> Key: FLINK-10697
> URL: https://issues.apache.org/jira/browse/FLINK-10697
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.6.1
> Reporter: Xuefu Zhang
> Assignee: Bowen Li
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently all Flink meta objects (currently tables only) are stored in memory
> as part of Calcite catalog. Those objects are temporary (such as inline
> tables), others are meant to live beyond user session. As we introduce
> catalog for those objects (tables, views, and UDFs), it makes sense to
> organize them neatly. Further, having a catalog implementation that store
> those objects in memory is to retain the currently behavior, which can be
> configured by user.
> Please note that this implementation is different from the current
> {{InMemoryExternalCatalog}, which is used mainly for testing and doesn't
> reflect what's actually needed for Flink meta objects.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)