This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 39d8236 [FLINK-12238][hive] Support database related operations in
GenericHiveMetastoreCatalog and setup flink-connector-hive module
39d8236 is described below
commit 39d82368eca3891d27c85dd9bc56db2344ee73ba
Author: Bowen L <[email protected]>
AuthorDate: Tue Apr 30 01:19:39 2019 -0700
[FLINK-12238][hive] Support database related operations in
GenericHiveMetastoreCatalog and setup flink-connector-hive module
This closes #8205
---
flink-connectors/flink-connector-hive/pom.xml | 429 +++++++++++++++++++++
.../catalog/hive/GenericHiveMetastoreCatalog.java | 352 +++++++++++++++++
.../hive/GenericHiveMetastoreCatalogUtil.java | 49 +++
.../src/main/resources/META-INF/NOTICE | 26 ++
.../main/resources/META-INF/licenses/LICENSE.antlr | 38 ++
.../hive/GenericHiveMetastoreCatalogTest.java | 83 ++++
.../flink/table/catalog/hive/HiveTestUtils.java | 54 +++
.../src/test/resources/hive-site.xml | 42 ++
.../src/test/resources/log4j-test.properties | 24 ++
flink-connectors/pom.xml | 1 +
flink-table/flink-table-api-java/pom.xml | 10 +
.../table/catalog/GenericCatalogDatabase.java | 2 +-
.../table/catalog/GenericInMemoryCatalogTest.java | 248 +++---------
.../flink/table/catalog/CatalogTestBase.java | 241 ++++++++++++
14 files changed, 1396 insertions(+), 203 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml
b/flink-connectors/flink-connector-hive/pom.xml
new file mode 100644
index 0000000..cb09934
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -0,0 +1,429 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+ <name>flink-connector-hive</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <hive.version>2.3.4</hive.version>
+
<hivemetastore.hadoop.version>2.7.2</hivemetastore.hadoop.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Hadoop dependency -->
+ <!-- Hadoop as provided dependencies, so we can depend on them
without pulling in Hadoop -->
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hivemetastore.hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hivemetastore.hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Hive dependencies -->
+ <!-- Note: Hive published jars do not have proper dependencies
declared.
+ We need to push for HIVE-16391
(https://issues.apache.org/jira/browse/HIVE-16391) to resolve this problem. -->
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javolution</groupId>
+ <artifactId>javolution</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jdo</groupId>
+ <artifactId>jdo-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>tephra-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>tephra-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+
<artifactId>tephra-hbase-compat-1.0</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.transaction</groupId>
+ <artifactId>transaction-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.logging.log4j</groupId>
+
<artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.tdunning</groupId>
+ <artifactId>json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-web</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>com.fasterxml.jackson.core</groupId>
+
<artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.joshelser</groupId>
+
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
+ </exclusion>
+
+ <!-- org.apache.hive:hive-service-rpc -->
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </exclusion>
+
+ <!-- org.apache.hive:hive-serde -->
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+
<artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+
<artifactId>hive-vector-code-gen</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-tez</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+
<artifactId>commons-httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.logging.log4j</groupId>
+
<artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>ST4</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+
<artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ivy</groupId>
+ <artifactId>ivy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>apache-curator</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+
<artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-all</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>stax</groupId>
+ <artifactId>stax-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.10.2.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<shadeTestJar>false</shadeTestJar>
+ <artifactSet>
+ <!-- Needs
end-to-end tests to ensure the built flink-connector-hive jar contains all
required dependencies and can run -->
+ <includes>
+
<include>commons-dbcp:commons-dbcp</include>
+
<include>commons-pool:commons-pool</include>
+
<include>commons-beanutils:commons-beanutils</include>
+
<include>com.jolbox:bonecp</include>
+
<include>org.apache.hive:*</include>
+
<include>org.apache.thrift:libthrift</include>
+
<include>org.datanucleus:*</include>
+
<include>org.antlr:antlr-runtime</include>
+ </includes>
+ </artifactSet>
+
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <!-- DO NOT RELOCATE
GUAVA IN THIS PACKAGE -->
+ <filters>
+ <filter>
+ <!--
some dependencies bring their own LICENSE.txt which we don't need -->
+
<artifact>*:*</artifact>
+
<excludes>
+
<exclude>META-INF/LICENSE.txt</exclude>
+
<exclude>META-INF/ASM_LICENSE.txt</exclude>
+
<exclude>META-INF/ASL2.0</exclude>
+
</excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Configure derby.log of embedded Hive metastore for
unit tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <systemPropertyVariables>
+
<derby.stream.error.file>${project.build.directory}/derby.log</derby.stream.error.file>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
new file mode 100644
index 0000000..4c07938
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A catalog that persists all Flink streaming and batch metadata by using
Hive metastore as a persistent storage.
+ */
+public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
+
+ public static final String DEFAULT_DB = "default";
+
+ private final String catalogName;
+ private final HiveConf hiveConf;
+
+ private String currentDatabase = DEFAULT_DB;
+ private IMetaStoreClient client;
+
+ public GenericHiveMetastoreCatalog(String catalogName, String
hivemetastoreURI) {
+ this(catalogName, getHiveConf(hivemetastoreURI));
+ }
+
+ public GenericHiveMetastoreCatalog(String catalogName, HiveConf
hiveConf) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName),
"catalogName cannot be null or empty");
+ this.catalogName = catalogName;
+
+ this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be
null");
+ LOG.info("Created GenericHiveMetastoreCatalog '{}'",
catalogName);
+ }
+
+ private static HiveConf getHiveConf(String hiveMetastoreURI) {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI),
"hiveMetastoreURI cannot be null or empty");
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
hiveMetastoreURI);
+ return hiveConf;
+ }
+
+ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+ try {
+ return RetryingMetaStoreClient.getProxy(
+ hiveConf,
+ null,
+ null,
+ HiveMetaStoreClient.class.getName(),
+ true);
+ } catch (MetaException e) {
+ throw new CatalogException("Failed to create Hive
metastore client", e);
+ }
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (client == null) {
+ client = getMetastoreClient(hiveConf);
+ LOG.info("Connected to Hive metastore");
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Close connection to Hive metastore");
+ }
+ }
+
+ // ------ databases ------
+
+ @Override
+ public String getCurrentDatabase() throws CatalogException {
+ return currentDatabase;
+ }
+
+ @Override
+ public void setCurrentDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ }
+
+ currentDatabase = databaseName;
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list all databases in
%s", catalogName), e);
+ }
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ Database hiveDb;
+
+ try {
+ hiveDb = client.getDatabase(databaseName);
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get database %s from
%s", databaseName, catalogName), e);
+ }
+
+ return new GenericCatalogDatabase(hiveDb.getParameters(),
hiveDb.getDescription());
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws
CatalogException {
+ try {
+ return client.getDatabase(databaseName) != null;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to determine whether
database %s exists or not", databaseName), e);
+ }
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database,
boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+
+ try {
+
client.createDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name,
database));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new
DatabaseAlreadyExistException(catalogName, name);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
create database %s", name), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists) throws
DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ try {
+ client.dropDatabase(name, true, ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new
DatabaseNotExistException(catalogName, name);
+ }
+ } catch (InvalidOperationException e) {
+ if (e.getMessage().startsWith(String.format("Database
%s is not empty", name))) {
+ throw new
DatabaseNotEmptyException(catalogName, name);
+ } else {
+ throw new
CatalogException(String.format("Failed to drop database %s", name), e);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
drop database %s", name), e);
+ }
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase,
boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+ try {
+ if (databaseExists(name)) {
+ client.alterDatabase(name,
GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase));
+ } else if (!ignoreIfNotExists) {
+ throw new
DatabaseNotExistException(catalogName, name);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
alter database %s", name), e);
+ }
+ }
+
+ // ------ tables and views------
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName,
boolean ignoreIfNotExists)
+ throws TableNotExistException,
TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(ObjectPath tableName, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath objectPath) throws
TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath objectPath) throws
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ partitions ------
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+ throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException,
PartitionAlreadyExistsException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, boolean ignoreIfNotExists)
+ throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException,
PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+ throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException,
PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException,
TableNotPartitionedException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException,
PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ functions ------
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction
function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException,
DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction
newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName) throws
DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws
FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
new file mode 100644
index 0000000..779905a
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+import java.util.Map;
+
+
+/**
+ * Utils to convert meta objects between Flink and Hive for
GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogUtil {
+
+ private GenericHiveMetastoreCatalogUtil() {
+ }
+
+ // ------ Utils ------
+
+ /**
+ * Creates a Hive database from CatalogDatabase.
+ */
+ public static Database createHiveDatabase(String dbName,
CatalogDatabase db) {
+ Map<String, String> props = db.getProperties();
+ return new Database(
+ dbName,
+ db.getDescription().isPresent() ?
db.getDescription().get() : null,
+ null,
+ props);
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..ea1c160
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,26 @@
+flink-connector-hive
+Copyright 2014-2019 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- commons-dbcp:commons-dbcp:1.4
+- commons-pool:commons-pool:1.5.4
+- com.jolbox:bonecp:0.8.0.RELEASE
+- org.apache.hive:hive-common:2.3.4
+- org.apache.hive:hive-metastore:2.3.4
+- org.apache.hive:hive-serde:2.3.4
+- org.apache.hive:hive-service-rpc:2.3.4
+- org.apache.hive:hive-storage-api:2.4.0
+- org.apache.thrift:libthrift:0.9.3
+- org.datanucleus:datanucleus-api-jdo:4.2.4
+- org.datanucleus:datanucleus-core:4.1.17
+- org.datanucleus:datanucleus-rdbms:4.1.19
+- org.datanucleus:javax.jdo:3.2.0-m3
+
+This project bundles the following dependencies under the BSD license.
+See bundled license files for details.
+
+- org.antlr:antlr-runtime:3.5.2
diff --git
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr
new file mode 100644
index 0000000..0af2cce
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr
@@ -0,0 +1,38 @@
+(BSD License: http://www.opensource.org/licenses/bsd-license)
+
+Copyright (c) 2012 Terence Parr and Sam Harwell
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the
+following conditions are met:
+
+* Redistributions of source code must retain the above
+ copyright notice, this list of conditions and the
+ following disclaimer.
+
+* Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the
+ following disclaimer in the documentation and/or other
+ materials provided with the distribution.
+
+* Neither the name of the Webbit nor the names of
+ its contributors may be used to endorse or promote products
+ derived from this software without specific prior written
+ permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
new file mode 100644
index 0000000..642c1c2
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Test for GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
+
+ @BeforeClass
+ public static void init() throws IOException {
+ catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+ catalog.open();
+ }
+
+ // =====================
+ // GenericHiveMetastoreCatalog doesn't support table operation yet
+ // Thus, overriding the following tests which involve table operation
in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog
+ // =====================
+
+ // TODO: re-enable this test once GenericHiveMetastoreCatalog support
table operations
+ @Test
+ public void testDropDb_DatabaseNotEmptyException() throws Exception {
+ }
+
+ // ------ utils ------
+
+ @Override
+ public String getBuiltInDefaultDatabase() {
+ return GenericHiveMetastoreCatalog.DEFAULT_DB;
+ }
+
+ @Override
+ public CatalogDatabase createDb() {
+ return new GenericCatalogDatabase(
+ new HashMap<String, String>() {{
+ put("k1", "v1");
+ }},
+ TEST_COMMENT);
+ }
+
+ @Override
+ public CatalogDatabase createAnotherDb() {
+ return new GenericCatalogDatabase(
+ new HashMap<String, String>() {{
+ put("k2", "v2");
+ }},
+ TEST_COMMENT);
+ }
+
+ @Override
+ public CatalogTable createTable() {
+ // TODO: implement this once GenericHiveMetastoreCatalog
support table operations
+ return null;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
new file mode 100644
index 0000000..83f5bed
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/**
+ * Test utils for Hive connector.
+ */
+public class HiveTestUtils {
+ private static final String HIVE_SITE_XML = "hive-site.xml";
+ private static final String HIVE_WAREHOUSE_URI_FORMAT =
"jdbc:derby:;databaseName=%s;create=true";
+ private static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ /**
+ * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
+ */
+ public static GenericHiveMetastoreCatalog
createGenericHiveMetastoreCatalog() throws IOException {
+ return new GenericHiveMetastoreCatalog("test", getHiveConf());
+ }
+
+ private static HiveConf getHiveConf() throws IOException {
+ ClassLoader classLoader = new
HiveTestUtils().getClass().getClassLoader();
+
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
+
+ TEMPORARY_FOLDER.create();
+ String warehouseDir =
TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
+ String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT,
warehouseDir);
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
+ hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
warehouseUri);
+
+ return hiveConf;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..c83bab8
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <property>
+ <name>hive.metastore.schema.verification</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.client.capability.check</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>datanucleus.schema.autoCreateTables</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>datanucleus.schema.autoCreateAll</name>
+ <value>true</value>
+ </property>
+
+</configuration>
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fcd8654
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 92058fa..e6d601d 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -49,6 +49,7 @@ under the License.
<module>flink-connector-elasticsearch2</module>
<module>flink-connector-elasticsearch5</module>
<module>flink-connector-elasticsearch6</module>
+ <module>flink-connector-hive</module>
<module>flink-connector-rabbitmq</module>
<module>flink-connector-twitter</module>
<module>flink-connector-nifi</module>
diff --git a/flink-table/flink-table-api-java/pom.xml
b/flink-table/flink-table-api-java/pom.xml
index a6bf656..f8a8fd7 100644
--- a/flink-table/flink-table-api-java/pom.xml
+++ b/flink-table/flink-table-api-java/pom.xml
@@ -42,5 +42,15 @@ under the License.
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index 5f2c732..959247a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -38,7 +38,7 @@ public class GenericCatalogDatabase implements
CatalogDatabase {
public GenericCatalogDatabase(Map<String, String> properties, String
comment) {
this(properties);
- this.comment = comment;
+ this.comment = checkNotNull(comment, "comment cannot be null");
}
public Map<String, String> getProperties() {
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d96d8b5..50d203a 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.table.catalog;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -35,10 +33,8 @@ import
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.functions.ScalarFunction;
import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.HashMap;
@@ -55,37 +51,14 @@ import static org.junit.Assert.assertTrue;
/**
* Test for GenericInMemoryCatalog.
*/
-public class GenericInMemoryCatalogTest {
- private static final String IS_STREAMING = "is_streaming";
-
- private final String testCatalogName = "test-catalog";
- private final String db1 = "db1";
- private final String db2 = "db2";
- private final String nonExistantDatabase = "non-existant-db";
-
- private final String t1 = "t1";
- private final String t2 = "t2";
- private final ObjectPath path1 = new ObjectPath(db1, t1);
- private final ObjectPath path2 = new ObjectPath(db2, t2);
- private final ObjectPath path3 = new ObjectPath(db1, t2);
- private final ObjectPath path4 = new ObjectPath(db1, "t3");
- private final ObjectPath nonExistDbPath =
ObjectPath.fromString("non.exist");
- private final ObjectPath nonExistObjectPath =
ObjectPath.fromString("db1.nonexist");
-
- private static final String TEST_COMMENT = "test comment";
- private static final String TABLE_COMMENT = "This is my batch table";
-
- private static ReadableWritableCatalog catalog;
-
- @Before
- public void setUp() {
- catalog = new GenericInMemoryCatalog(testCatalogName);
+public class GenericInMemoryCatalogTest extends CatalogTestBase {
+
+ @BeforeClass
+ public static void init() {
+ catalog = new GenericInMemoryCatalog(TEST_CATALOG_NAME);
catalog.open();
}
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
@After
public void close() throws Exception {
if (catalog.tableExists(path1)) {
@@ -100,13 +73,6 @@ public class GenericInMemoryCatalogTest {
if (catalog.functionExists(path1)) {
catalog.dropFunction(path1, true);
}
- if (catalog.databaseExists(db1)) {
- catalog.dropDatabase(db1, true);
- }
- if (catalog.databaseExists(db2)) {
- catalog.dropDatabase(db2, true);
- }
- catalog.close();
}
// ------ tables ------
@@ -486,136 +452,6 @@ public class GenericInMemoryCatalogTest {
assertEquals(Arrays.asList(path1.getObjectName()),
catalog.listViews(db1));
}
- // ------ databases ------
-
- @Test
- public void testCreateDb() throws Exception {
- catalog.createDatabase(db2, createDb(), false);
-
- assertEquals(2, catalog.listDatabases().size());
- }
-
- @Test
- public void testSetCurrentDatabase() throws Exception {
- assertEquals(GenericInMemoryCatalog.DEFAULT_DB,
catalog.getCurrentDatabase());
- catalog.createDatabase(db2, createDb(), true);
- catalog.setCurrentDatabase(db2);
- assertEquals(db2, catalog.getCurrentDatabase());
- catalog.setCurrentDatabase(GenericInMemoryCatalog.DEFAULT_DB);
- assertEquals(GenericInMemoryCatalog.DEFAULT_DB,
catalog.getCurrentDatabase());
- catalog.dropDatabase(db2, false);
- }
-
- @Test
- public void testSetCurrentDatabaseNegative() throws Exception {
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database " + this.nonExistantDatabase
+ " does not exist in Catalog");
- catalog.setCurrentDatabase(this.nonExistantDatabase);
- }
-
- @Test
- public void testCreateDb_DatabaseAlreadyExistException() throws
Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- exception.expect(DatabaseAlreadyExistException.class);
- exception.expectMessage("Database db1 already exists in
Catalog");
- catalog.createDatabase(db1, createDb(), false);
- }
-
- @Test
- public void testCreateDb_DatabaseAlreadyExist_ignored() throws
Exception {
- CatalogDatabase cd1 = createDb();
- catalog.createDatabase(db1, cd1, false);
- List<String> dbs = catalog.listDatabases();
-
-
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
- assertEquals(2, dbs.size());
- assertEquals(new HashSet<>(Arrays.asList(db1,
catalog.getCurrentDatabase())), new HashSet<>(dbs));
-
- catalog.createDatabase(db1, createAnotherDb(), true);
-
-
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
- assertEquals(2, dbs.size());
- assertEquals(new HashSet<>(Arrays.asList(db1,
catalog.getCurrentDatabase())), new HashSet<>(dbs));
- }
-
- @Test
- public void testGetDb_DatabaseNotExistException() throws Exception {
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database nonexistent does not exist in
Catalog");
- catalog.getDatabase("nonexistent");
- }
-
- @Test
- public void testDropDb() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- assertTrue(catalog.listDatabases().contains(db1));
-
- catalog.dropDatabase(db1, false);
-
- assertFalse(catalog.listDatabases().contains(db1));
- }
-
- @Test
- public void testDropDb_DatabaseNotExistException() throws Exception {
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database db1 does not exist in
Catalog");
- catalog.dropDatabase(db1, false);
- }
-
- @Test
- public void testDropDb_DatabaseNotExist_Ignore() throws Exception {
- catalog.dropDatabase(db1, true);
- }
-
- @Test
- public void testDropDb_databaseIsNotEmpty() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
-
- exception.expect(DatabaseNotEmptyException.class);
- exception.expectMessage("Database db1 in Catalog test-catalog
is not empty");
- catalog.dropDatabase(db1, true);
- }
-
- @Test
- public void testAlterDb() throws Exception {
- CatalogDatabase db = createDb();
- catalog.createDatabase(db1, db, false);
-
-
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
-
- CatalogDatabase newDb = createAnotherDb();
- catalog.alterDatabase(db1, newDb, false);
-
-
assertFalse(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
-
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(newDb.getProperties().entrySet()));
- }
-
- @Test
- public void testAlterDb_DatabaseNotExistException() throws Exception {
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database nonexistent does not exist in
Catalog");
- catalog.alterDatabase("nonexistent", createDb(), false);
- }
-
- @Test
- public void testAlterDb_DatabaseNotExist_ignored() throws Exception {
- catalog.alterDatabase("nonexistent", createDb(), true);
-
- assertFalse(catalog.databaseExists("nonexistent"));
- }
-
- @Test
- public void testDbExists() throws Exception {
- assertFalse(catalog.databaseExists("nonexistent"));
-
- catalog.createDatabase(db1, createDb(), false);
-
- assertTrue(catalog.databaseExists(db1));
- }
-
@Test
public void testRenameView() throws Exception {
catalog.createDatabase("db1", new GenericCatalogDatabase(new
HashMap<>()), false);
@@ -660,7 +496,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionSpecInvalidException.class);
exception.expectMessage(
String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s",
- invalid, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ invalid, table.getPartitionKeys(),
path1.getFullName(), TEST_CATALOG_NAME));
catalog.listPartitions(path1, invalid);
}
@@ -670,7 +506,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(TableNotExistException.class);
exception.expectMessage(
- String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), testCatalogName));
+ String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.createPartition(path1, createPartitionSpec(),
createPartition(), false);
}
@@ -681,7 +517,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(TableNotPartitionedException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.createPartition(path1, createPartitionSpec(),
createPartition(), false);
}
@@ -695,7 +531,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionSpecInvalidException.class);
exception.expectMessage(
String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), TEST_CATALOG_NAME));
catalog.createPartition(path1, partitionSpec,
createPartition(), false);
}
@@ -711,7 +547,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionAlreadyExistsException.class);
exception.expectMessage(
String.format("Partition %s of table %s in catalog %s
already exists.",
- partitionSpec, path1.getFullName(),
testCatalogName));
+ partitionSpec, path1.getFullName(),
TEST_CATALOG_NAME));
catalog.createPartition(path1, partitionSpec,
createPartition(), false);
}
@@ -744,7 +580,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(TableNotExistException.class);
exception.expectMessage(
- String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), testCatalogName));
+ String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.dropPartition(path1, createPartitionSpec(), false);
}
@@ -755,7 +591,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(TableNotPartitionedException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.dropPartition(path1, createPartitionSpec(), false);
}
@@ -769,7 +605,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionSpecInvalidException.class);
exception.expectMessage(
String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), TEST_CATALOG_NAME));
catalog.dropPartition(path1, partitionSpec, false);
}
@@ -781,7 +617,7 @@ public class GenericInMemoryCatalogTest {
CatalogPartitionSpec partitionSpec = createPartitionSpec();
exception.expect(PartitionNotExistException.class);
exception.expectMessage(
- String.format("Partition %s of table %s in catalog %s
does not exist.", partitionSpec, path1.getFullName(), testCatalogName));
+ String.format("Partition %s of table %s in catalog %s
does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
catalog.dropPartition(path1, partitionSpec, false);
}
@@ -828,7 +664,7 @@ public class GenericInMemoryCatalogTest {
CatalogPartitionSpec partitionSpec = createPartitionSpec();
exception.expect(TableNotExistException.class);
exception.expectMessage(
- String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), testCatalogName));
+ String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, createPartition(),
false);
}
@@ -840,7 +676,7 @@ public class GenericInMemoryCatalogTest {
CatalogPartitionSpec partitionSpec = createPartitionSpec();
exception.expect(TableNotPartitionedException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, createPartition(),
false);
}
@@ -854,7 +690,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionSpecInvalidException.class);
exception.expectMessage(
String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, createPartition(),
false);
}
@@ -868,7 +704,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionNotExistException.class);
exception.expectMessage(
String.format("Partition %s of table %s in catalog %s
does not exist.",
- partitionSpec, path1.getFullName(),
testCatalogName));
+ partitionSpec, path1.getFullName(),
TEST_CATALOG_NAME));
catalog.alterPartition(path1, partitionSpec, catalogPartition,
false);
}
@@ -892,7 +728,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(TableNotPartitionedException.class);
exception.expectMessage(
- String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
catalog.getPartition(path1, createPartitionSpec());
}
@@ -906,7 +742,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionSpecInvalidException.class);
exception.expectMessage(
String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), TEST_CATALOG_NAME));
catalog.getPartition(path1, partitionSpec);
}
@@ -924,7 +760,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionSpecInvalidException.class);
exception.expectMessage(
String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
- partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), TEST_CATALOG_NAME));
catalog.getPartition(path1, partitionSpec);
}
@@ -937,7 +773,7 @@ public class GenericInMemoryCatalogTest {
exception.expect(PartitionNotExistException.class);
exception.expectMessage(
String.format("Partition %s of table %s in catalog %s
does not exist.",
- partitionSpec, path1.getFullName(),
testCatalogName));
+ partitionSpec, path1.getFullName(),
TEST_CATALOG_NAME));
catalog.getPartition(path1, partitionSpec);
}
@@ -1106,13 +942,33 @@ public class GenericInMemoryCatalogTest {
// ------ utilities ------
+ @Override
+ public String getBuiltInDefaultDatabase() {
+ return GenericInMemoryCatalog.DEFAULT_DB;
+ }
+
+ @Override
+ public CatalogDatabase createDb() {
+ return new GenericCatalogDatabase(new HashMap<String, String>()
{{
+ put("k1", "v1");
+ }}, TEST_COMMENT);
+ }
+
+ @Override
+ public CatalogDatabase createAnotherDb() {
+ return new GenericCatalogDatabase(new HashMap<String, String>()
{{
+ put("k2", "v2");
+ }}, "this is another database.");
+ }
+
private GenericCatalogTable createStreamingTable() {
return CatalogTestUtil.createTable(
createTableSchema(),
getStreamingTableProperties(), TABLE_COMMENT);
}
- private GenericCatalogTable createTable() {
+ @Override
+ public GenericCatalogTable createTable() {
return CatalogTestUtil.createTable(
createTableSchema(),
getBatchTableProperties(), TABLE_COMMENT);
@@ -1186,12 +1042,6 @@ public class GenericInMemoryCatalogTest {
return new GenericCatalogPartition(props);
}
- private CatalogDatabase createDb() {
- return new GenericCatalogDatabase(new HashMap<String, String>()
{{
- put("k1", "v1");
- }}, TEST_COMMENT);
- }
-
private Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {{
put(IS_STREAMING, "false");
@@ -1204,12 +1054,6 @@ public class GenericInMemoryCatalogTest {
}};
}
- private CatalogDatabase createAnotherDb() {
- return new GenericCatalogDatabase(new HashMap<String, String>()
{{
- put("k2", "v2");
- }}, "this is another database.");
- }
-
private TableSchema createTableSchema() {
return new TableSchema(
new String[] {"first", "second", "third"},
@@ -1235,7 +1079,7 @@ public class GenericInMemoryCatalogTest {
private CatalogView createView() {
return new GenericCatalogView(
String.format("select * from %s", t1),
- String.format("select * from %s.%s", testCatalogName,
path1.getFullName()),
+ String.format("select * from %s.%s", TEST_CATALOG_NAME,
path1.getFullName()),
createTableSchema(),
new HashMap<>(),
"This is a view");
@@ -1244,7 +1088,7 @@ public class GenericInMemoryCatalogTest {
private CatalogView createAnotherView() {
return new GenericCatalogView(
String.format("select * from %s", t2),
- String.format("select * from %s.%s", testCatalogName,
path2.getFullName()),
+ String.format("select * from %s.%s", TEST_CATALOG_NAME,
path2.getFullName()),
createTableSchema(),
new HashMap<>(),
"This is another view");
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
new file mode 100644
index 0000000..83a8f2b
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base testing class for unit tests of a specific catalog, like
GenericInMemoryCatalog and HiveCatalog.
+ */
+public abstract class CatalogTestBase {
+ protected static final String IS_STREAMING = "is_streaming";
+
+ protected final String db1 = "db1";
+ protected final String db2 = "db2";
+ protected final String nonExistentDatabase = "non-existent-db";
+
+ protected final String t1 = "t1";
+ protected final String t2 = "t2";
+ protected final ObjectPath path1 = new ObjectPath(db1, t1);
+ protected final ObjectPath path2 = new ObjectPath(db2, t2);
+ protected final ObjectPath path3 = new ObjectPath(db1, t2);
+ protected final ObjectPath path4 = new ObjectPath(db1, "t3");
+ protected final ObjectPath nonExistDbPath =
ObjectPath.fromString("non.exist");
+ protected final ObjectPath nonExistObjectPath =
ObjectPath.fromString("db1.nonexist");
+
+ protected static final String TEST_CATALOG_NAME = "test-catalog";
+ protected static final String TEST_COMMENT = "test comment";
+ protected static final String TABLE_COMMENT = "This is my batch table";
+
+ protected static ReadableWritableCatalog catalog;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @After
+ public void cleanup() throws Exception {
+ if (catalog.databaseExists(db1)) {
+ catalog.dropDatabase(db1, true);
+ }
+ if (catalog.databaseExists(db2)) {
+ catalog.dropDatabase(db2, true);
+ }
+ }
+
+ @AfterClass
+ public static void closeup() {
+ catalog.close();
+ }
+
+ // ------ databases ------
+
+ @Test
+ public void testCreateDb() throws Exception {
+ catalog.createDatabase(db2, createDb(), false);
+
+ assertEquals(2, catalog.listDatabases().size());
+ }
+
+ @Test
+ public void testSetCurrentDatabase() throws Exception {
+ assertEquals(getBuiltInDefaultDatabase(),
catalog.getCurrentDatabase());
+ catalog.createDatabase(db2, createDb(), true);
+ catalog.setCurrentDatabase(db2);
+ assertEquals(db2, catalog.getCurrentDatabase());
+ catalog.setCurrentDatabase(getBuiltInDefaultDatabase());
+ assertEquals(getBuiltInDefaultDatabase(),
catalog.getCurrentDatabase());
+ catalog.dropDatabase(db2, false);
+ }
+
+ @Test
+ public void testSetCurrentDatabaseNegative() throws Exception {
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database " + this.nonExistentDatabase
+ " does not exist in Catalog");
+ catalog.setCurrentDatabase(this.nonExistentDatabase);
+ }
+
+ @Test
+ public void testCreateDb_DatabaseAlreadyExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(DatabaseAlreadyExistException.class);
+ exception.expectMessage("Database db1 already exists in
Catalog");
+ catalog.createDatabase(db1, createDb(), false);
+ }
+
+ @Test
+ public void testCreateDb_DatabaseAlreadyExist_ignored() throws
Exception {
+ CatalogDatabase cd1 = createDb();
+ catalog.createDatabase(db1, cd1, false);
+ List<String> dbs = catalog.listDatabases();
+
+
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
+ assertEquals(2, dbs.size());
+ assertEquals(new HashSet<>(Arrays.asList(db1,
catalog.getCurrentDatabase())), new HashSet<>(dbs));
+
+ catalog.createDatabase(db1, createAnotherDb(), true);
+
+
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
+ assertEquals(2, dbs.size());
+ assertEquals(new HashSet<>(Arrays.asList(db1,
catalog.getCurrentDatabase())), new HashSet<>(dbs));
+ }
+
+ @Test
+ public void testGetDb_DatabaseNotExistException() throws Exception {
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database nonexistent does not exist in
Catalog");
+ catalog.getDatabase("nonexistent");
+ }
+
+ @Test
+ public void testDropDb() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ assertTrue(catalog.listDatabases().contains(db1));
+
+ catalog.dropDatabase(db1, false);
+
+ assertFalse(catalog.listDatabases().contains(db1));
+ }
+
+ @Test
+ public void testDropDb_DatabaseNotExistException() throws Exception {
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database db1 does not exist in
Catalog");
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testDropDb_DatabaseNotExist_Ignore() throws Exception {
+ catalog.dropDatabase(db1, true);
+ }
+
+ @Test
+ public void testDropDb_DatabaseNotEmptyException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ exception.expect(DatabaseNotEmptyException.class);
+ exception.expectMessage("Database db1 in Catalog test-catalog
is not empty");
+ catalog.dropDatabase(db1, true);
+ }
+
+ @Test
+ public void testAlterDb() throws Exception {
+ CatalogDatabase db = createDb();
+ catalog.createDatabase(db1, db, false);
+
+
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
+
+ CatalogDatabase newDb = createAnotherDb();
+ catalog.alterDatabase(db1, newDb, false);
+
+
assertFalse(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
+
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(newDb.getProperties().entrySet()));
+ }
+
+ @Test
+ public void testAlterDb_DatabaseNotExistException() throws Exception {
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database nonexistent does not exist in
Catalog");
+ catalog.alterDatabase("nonexistent", createDb(), false);
+ }
+
+ @Test
+ public void testAlterDb_DatabaseNotExist_ignored() throws Exception {
+ catalog.alterDatabase("nonexistent", createDb(), true);
+
+ assertFalse(catalog.databaseExists("nonexistent"));
+ }
+
+ @Test
+ public void testDbExists() throws Exception {
+ assertFalse(catalog.databaseExists("nonexistent"));
+
+ catalog.createDatabase(db1, createDb(), false);
+
+ assertTrue(catalog.databaseExists(db1));
+ }
+
+ // ------ utilities ------
+
+ /**
+ * Get the built-in default database of the specific catalog
implementation.
+ *
+ * @return The built-in default database name
+ */
+ public abstract String getBuiltInDefaultDatabase();
+
+ /**
+ * Create a CatalogDatabase instance by specific catalog implementation.
+ *
+ * @return a CatalogDatabase instance
+ */
+ public abstract CatalogDatabase createDb();
+
+ /**
+ * Create another CatalogDatabase instance by specific catalog
implementation.
+ *
+ * @return another CatalogDatabase instance
+ */
+ public abstract CatalogDatabase createAnotherDb();
+
+ /**
+ * Create a CatalogTable instance by specific catalog implementation.
+ *
+ * @return a CatalogTable instance
+ */
+ public abstract CatalogTable createTable();
+}