This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for 
jdbc driver
2ff4e220945 is described below

commit 2ff4e220945680eba21065480385148ca8d034da
Author: Shammon FY <[email protected]>
AuthorDate: Thu Apr 6 13:09:19 2023 +0800

    [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver
    
    Close apache/flink#22362
---
 .../flink/table/jdbc/BaseDatabaseMetaData.java     | 792 +++++++++++++++++++++
 .../org/apache/flink/table/jdbc/DriverUri.java     |   4 +
 .../apache/flink/table/jdbc/FlinkConnection.java   |  15 +-
 .../flink/table/jdbc/FlinkDatabaseMetaData.java    | 384 ++++++++++
 .../apache/flink/table/jdbc/FlinkResultSet.java    |  29 +-
 .../flink/table/jdbc/FlinkResultSetMetaData.java   |  10 +-
 .../table/jdbc/utils/CloseableResultIterator.java  |  24 +
 .../table/jdbc/utils/CollectionResultIterator.java |  45 ++
 .../table/jdbc/utils/DatabaseMetaDataUtils.java    | 110 +++
 .../table/jdbc/utils/StatementResultIterator.java  |  46 ++
 .../flink/table/jdbc/FlinkConnectionTest.java      |   5 +-
 .../table/jdbc/FlinkDatabaseMetaDataTest.java      | 121 ++++
 12 files changed, 1567 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
new file mode 100644
index 00000000000..75ec33aab67
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.RowIdLifetime;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+/** Base {@link DatabaseMetaData} for flink driver with not supported 
features. */
+public abstract class BaseDatabaseMetaData implements DatabaseMetaData {
+    @Override
+    public String getUserName() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getUserName is not supported");
+    }
+
+    @Override
+    public boolean nullsAreSortedHigh() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#nullsAreSortedHigh is not supported");
+    }
+
+    @Override
+    public boolean nullsAreSortedLow() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#nullsAreSortedLow is not supported");
+    }
+
+    @Override
+    public boolean nullsAreSortedAtStart() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#nullsAreSortedAtStart is not 
supported");
+    }
+
+    @Override
+    public boolean nullsAreSortedAtEnd() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#nullsAreSortedAtEnd is not supported");
+    }
+
+    @Override
+    public boolean supportsMixedCaseIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsMixedCaseIdentifiers is not 
supported");
+    }
+
+    @Override
+    public boolean storesUpperCaseIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#storesUpperCaseIdentifiers is not 
supported");
+    }
+
+    @Override
+    public boolean storesLowerCaseIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#storesLowerCaseIdentifiers is not 
supported");
+    }
+
+    @Override
+    public boolean storesMixedCaseIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#storesMixedCaseIdentifiers is not 
supported");
+    }
+
+    @Override
+    public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsMixedCaseQuotedIdentifiers is 
not supported");
+    }
+
+    @Override
+    public boolean storesUpperCaseQuotedIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#storesUpperCaseQuotedIdentifiers is not 
supported");
+    }
+
+    @Override
+    public boolean storesLowerCaseQuotedIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#storesLowerCaseQuotedIdentifiers is not 
supported");
+    }
+
+    @Override
+    public boolean storesMixedCaseQuotedIdentifiers() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#storesMixedCaseQuotedIdentifiers is not 
supported");
+    }
+
+    @Override
+    public String getSQLKeywords() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getSQLKeywords is not supported");
+    }
+
+    @Override
+    public String getNumericFunctions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getNumericFunctions is not supported");
+    }
+
+    @Override
+    public String getStringFunctions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getStringFunctions is not supported");
+    }
+
+    @Override
+    public String getSystemFunctions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getSystemFunctions is not supported");
+    }
+
+    @Override
+    public String getTimeDateFunctions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getTimeDateFunctions is not supported");
+    }
+
+    @Override
+    public String getSearchStringEscape() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getSearchStringEscape is not 
supported");
+    }
+
+    @Override
+    public boolean nullPlusNonNullIsNull() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#nullPlusNonNullIsNull is not 
supported");
+    }
+
+    @Override
+    public boolean supportsConvert() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsConvert is not supported");
+    }
+
+    @Override
+    public boolean supportsConvert(int fromType, int toType) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsConvert is not supported");
+    }
+
+    @Override
+    public boolean supportsMultipleResultSets() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsMultipleResultSets is not 
supported");
+    }
+
+    @Override
+    public boolean supportsMultipleTransactions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsMultipleTransactions is not 
supported");
+    }
+
+    @Override
+    public boolean supportsMinimumSQLGrammar() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsMinimumSQLGrammar is not 
supported");
+    }
+
+    @Override
+    public boolean supportsCoreSQLGrammar() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsCoreSQLGrammar is not 
supported");
+    }
+
+    @Override
+    public boolean supportsExtendedSQLGrammar() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsExtendedSQLGrammar is not 
supported");
+    }
+
+    @Override
+    public boolean supportsANSI92EntryLevelSQL() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsANSI92EntryLevelSQL is not 
supported");
+    }
+
+    @Override
+    public boolean supportsANSI92IntermediateSQL() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsANSI92IntermediateSQL is not 
supported");
+    }
+
+    @Override
+    public boolean supportsANSI92FullSQL() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsANSI92FullSQL is not 
supported");
+    }
+
+    @Override
+    public boolean supportsIntegrityEnhancementFacility() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsIntegrityEnhancementFacility is 
not supported");
+    }
+
+    @Override
+    public String getProcedureTerm() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getProcedureTerm is not supported");
+    }
+
+    @Override
+    public boolean isCatalogAtStart() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#isCatalogAtStart is not supported");
+    }
+
+    @Override
+    public String getCatalogSeparator() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getCatalogSeparator is not supported");
+    }
+
+    @Override
+    public boolean supportsSchemasInProcedureCalls() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsSchemasInProcedureCalls is not 
supported");
+    }
+
+    @Override
+    public boolean supportsSchemasInIndexDefinitions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsSchemasInIndexDefinitions is 
not supported");
+    }
+
+    @Override
+    public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException 
{
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsSchemasInPrivilegeDefinitions 
is not supported");
+    }
+
+    @Override
+    public boolean supportsCatalogsInProcedureCalls() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsCatalogsInProcedureCalls is not 
supported");
+    }
+
+    @Override
+    public boolean supportsCatalogsInIndexDefinitions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsCatalogsInIndexDefinitions is 
not supported");
+    }
+
+    @Override
+    public boolean supportsCatalogsInPrivilegeDefinitions() throws 
SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsCatalogsInPrivilegeDefinitions 
is not supported");
+    }
+
+    @Override
+    public boolean supportsPositionedDelete() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsPositionedDelete is not 
supported");
+    }
+
+    @Override
+    public boolean supportsPositionedUpdate() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsPositionedUpdate is not 
supported");
+    }
+
+    @Override
+    public boolean supportsSelectForUpdate() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsSelectForUpdate is not 
supported");
+    }
+
+    @Override
+    public boolean supportsStoredProcedures() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsStoredProcedures is not 
supported");
+    }
+
+    @Override
+    public boolean supportsOpenCursorsAcrossCommit() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsOpenCursorsAcrossCommit is not 
supported");
+    }
+
+    @Override
+    public boolean supportsOpenCursorsAcrossRollback() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsOpenCursorsAcrossRollback is 
not supported");
+    }
+
+    @Override
+    public boolean supportsOpenStatementsAcrossCommit() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsOpenStatementsAcrossCommit is 
not supported");
+    }
+
+    @Override
+    public boolean supportsOpenStatementsAcrossRollback() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsOpenStatementsAcrossRollback is 
not supported");
+    }
+
+    @Override
+    public int getMaxBinaryLiteralLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxBinaryLiteralLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxCharLiteralLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxCharLiteralLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxColumnNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxColumnNameLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxColumnsInGroupBy() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxColumnsInGroupBy is not 
supported");
+    }
+
+    @Override
+    public int getMaxColumnsInIndex() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxColumnsInIndex is not supported");
+    }
+
+    @Override
+    public int getMaxColumnsInOrderBy() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxColumnsInOrderBy is not 
supported");
+    }
+
+    @Override
+    public int getMaxColumnsInSelect() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxColumnsInSelect is not 
supported");
+    }
+
+    @Override
+    public int getMaxColumnsInTable() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxColumnsInTable is not supported");
+    }
+
+    @Override
+    public int getMaxConnections() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxConnections is not supported");
+    }
+
+    @Override
+    public int getMaxCursorNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxCursorNameLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxIndexLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxIndexLength is not supported");
+    }
+
+    @Override
+    public int getMaxSchemaNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxSchemaNameLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxProcedureNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxProcedureNameLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxCatalogNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxCatalogNameLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxRowSize() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxRowSize is not supported");
+    }
+
+    @Override
+    public boolean doesMaxRowSizeIncludeBlobs() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#doesMaxRowSizeIncludeBlobs is not 
supported");
+    }
+
+    @Override
+    public int getMaxStatementLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxStatementLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxStatements() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxStatements is not supported");
+    }
+
+    @Override
+    public int getMaxTableNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxTableNameLength is not 
supported");
+    }
+
+    @Override
+    public int getMaxTablesInSelect() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxTablesInSelect is not supported");
+    }
+
+    @Override
+    public int getMaxUserNameLength() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getMaxUserNameLength is not supported");
+    }
+
+    @Override
+    public int getDefaultTransactionIsolation() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getDefaultTransactionIsolation is not 
supported");
+    }
+
+    @Override
+    public boolean supportsTransactions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsTransactions is not supported");
+    }
+
+    @Override
+    public boolean supportsTransactionIsolationLevel(int level) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsTransactionIsolationLevel is 
not supported");
+    }
+
+    @Override
+    public boolean supportsDataDefinitionAndDataManipulationTransactions() 
throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                
"FlinkDatabaseMetaData#supportsDataDefinitionAndDataManipulationTransactions is 
not supported");
+    }
+
+    @Override
+    public boolean supportsDataManipulationTransactionsOnly() throws 
SQLException {
+        throw new SQLFeatureNotSupportedException(
+                
"FlinkDatabaseMetaData#supportsDataManipulationTransactionsOnly is not 
supported");
+    }
+
+    @Override
+    public boolean dataDefinitionCausesTransactionCommit() throws SQLException 
{
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#dataDefinitionCausesTransactionCommit 
is not supported");
+    }
+
+    @Override
+    public boolean dataDefinitionIgnoredInTransactions() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#dataDefinitionIgnoredInTransactions is 
not supported");
+    }
+
+    @Override
+    public ResultSet getProcedures(
+            String catalog, String schemaPattern, String procedureNamePattern) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getProcedures is not supported");
+    }
+
+    @Override
+    public ResultSet getProcedureColumns(
+            String catalog,
+            String schemaPattern,
+            String procedureNamePattern,
+            String columnNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getProcedureColumns is not supported");
+    }
+
+    @Override
+    public ResultSet getColumnPrivileges(
+            String catalog, String schema, String table, String 
columnNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getColumnPrivileges is not supported");
+    }
+
+    @Override
+    public ResultSet getTablePrivileges(
+            String catalog, String schemaPattern, String tableNamePattern) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getTablePrivileges is not supported");
+    }
+
+    @Override
+    public ResultSet getBestRowIdentifier(
+            String catalog, String schema, String table, int scope, boolean 
nullable)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getBestRowIdentifier is not supported");
+    }
+
+    @Override
+    public ResultSet getVersionColumns(String catalog, String schema, String 
table)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getVersionColumns is not supported");
+    }
+
+    @Override
+    public ResultSet getImportedKeys(String catalog, String schema, String 
table)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getImportedKeys is not supported");
+    }
+
+    @Override
+    public ResultSet getExportedKeys(String catalog, String schema, String 
table)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getExportedKeys is not supported");
+    }
+
+    @Override
+    public ResultSet getCrossReference(
+            String parentCatalog,
+            String parentSchema,
+            String parentTable,
+            String foreignCatalog,
+            String foreignSchema,
+            String foreignTable)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getCrossReference is not supported");
+    }
+
+    @Override
+    public ResultSet getTypeInfo() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getTypeInfo is not supported");
+    }
+
+    @Override
+    public ResultSet getIndexInfo(
+            String catalog, String schema, String table, boolean unique, 
boolean approximate)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getIndexInfo is not supported");
+    }
+
+    @Override
+    public boolean supportsResultSetConcurrency(int type, int concurrency) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsResultSetConcurrency is not 
supported");
+    }
+
+    @Override
+    public boolean ownUpdatesAreVisible(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#ownUpdatesAreVisible is not supported");
+    }
+
+    @Override
+    public boolean ownDeletesAreVisible(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#ownDeletesAreVisible is not supported");
+    }
+
+    @Override
+    public boolean ownInsertsAreVisible(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#ownInsertsAreVisible is not supported");
+    }
+
+    @Override
+    public boolean othersUpdatesAreVisible(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#othersUpdatesAreVisible is not 
supported");
+    }
+
+    @Override
+    public boolean othersDeletesAreVisible(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#othersDeletesAreVisible is not 
supported");
+    }
+
+    @Override
+    public boolean othersInsertsAreVisible(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#othersInsertsAreVisible is not 
supported");
+    }
+
+    @Override
+    public boolean updatesAreDetected(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#updatesAreDetected is not supported");
+    }
+
+    @Override
+    public boolean deletesAreDetected(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#deletesAreDetected is not supported");
+    }
+
+    @Override
+    public boolean insertsAreDetected(int type) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#insertsAreDetected is not supported");
+    }
+
+    @Override
+    public boolean supportsBatchUpdates() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsBatchUpdates is not supported");
+    }
+
+    @Override
+    public ResultSet getUDTs(
+            String catalog, String schemaPattern, String typeNamePattern, 
int[] types)
+            throws SQLException {
+        throw new 
SQLFeatureNotSupportedException("FlinkDatabaseMetaData#getUDTs is not 
supported");
+    }
+
+    @Override
+    public boolean supportsSavepoints() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsSavepoints is not supported");
+    }
+
+    @Override
+    public boolean supportsNamedParameters() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsNamedParameters is not 
supported");
+    }
+
+    @Override
+    public boolean supportsMultipleOpenResults() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsMultipleOpenResults is not 
supported");
+    }
+
+    @Override
+    public boolean supportsGetGeneratedKeys() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsGetGeneratedKeys is not 
supported");
+    }
+
+    @Override
+    public ResultSet getSuperTypes(String catalog, String schemaPattern, 
String typeNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getSuperTypes is not supported");
+    }
+
+    @Override
+    public ResultSet getSuperTables(String catalog, String schemaPattern, 
String tableNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getSuperTypes is not supported");
+    }
+
+    @Override
+    public ResultSet getAttributes(
+            String catalog,
+            String schemaPattern,
+            String typeNamePattern,
+            String attributeNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getAttributes is not supported");
+    }
+
+    @Override
+    public boolean supportsResultSetHoldability(int holdability) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsResultSetHoldability is not 
supported");
+    }
+
+    @Override
+    public int getResultSetHoldability() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getResultSetHoldability is not 
supported");
+    }
+
+    @Override
+    public int getSQLStateType() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getSQLStateType is not supported");
+    }
+
+    @Override
+    public boolean locatorsUpdateCopy() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#locatorsUpdateCopy is not supported");
+    }
+
+    @Override
+    public boolean supportsStatementPooling() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsStatementPooling is not 
supported");
+    }
+
+    @Override
+    public RowIdLifetime getRowIdLifetime() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getRowIdLifetime is not supported");
+    }
+
+    @Override
+    public boolean supportsStoredFunctionsUsingCallSyntax() throws 
SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#supportsStoredFunctionsUsingCallSyntax 
is not supported");
+    }
+
+    @Override
+    public boolean autoCommitFailureClosesAllResultSets() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#autoCommitFailureClosesAllResultSets is 
not supported");
+    }
+
+    @Override
+    public ResultSet getClientInfoProperties() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getClientInfoProperties is not 
supported");
+    }
+
+    @Override
+    public ResultSet getFunctions(String catalog, String schemaPattern, String 
functionNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getFunctions is not supported");
+    }
+
+    @Override
+    public ResultSet getFunctionColumns(
+            String catalog,
+            String schemaPattern,
+            String functionNamePattern,
+            String columnNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getFunctionColumns is not supported");
+    }
+
+    @Override
+    public ResultSet getPseudoColumns(
+            String catalog, String schemaPattern, String tableNamePattern, 
String columnNamePattern)
+            throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#getPseudoColumns is not supported");
+    }
+
+    @Override
+    public boolean generatedKeyAlwaysReturned() throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#generatedKeyAlwaysReturned is not 
supported");
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        throw new 
SQLFeatureNotSupportedException("FlinkDatabaseMetaData#unwrap is not 
supported");
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        throw new SQLFeatureNotSupportedException(
+                "FlinkDatabaseMetaData#isWrapperFor is not supported");
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
index a13650b45e7..cd89b4fc3fb 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
@@ -75,6 +75,10 @@ public class DriverUri {
         return database;
     }
 
+    public String getURL() {
+        return String.format("%s%s", URL_PREFIX, uri);
+    }
+
     private void initCatalogAndSchema() throws SQLException {
         String path = uri.getPath();
         if (isNullOrWhitespaceOnly(uri.getPath()) || path.equals("/")) {
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
index 7efe7de419f..076172659ff 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
@@ -28,18 +28,21 @@ import org.apache.flink.table.jdbc.utils.DriverUtils;
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.flink.table.jdbc.utils.DriverUtils.checkArgument;
+
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
+    private final String url;
     private final Executor executor;
     private volatile boolean closed = false;
 
     public FlinkConnection(DriverUri driverUri) {
+        this.url = driverUri.getURL();
         // TODO Support default context from map to get gid of flink core for 
jdbc driver in
         // https://issues.apache.org/jira/browse/FLINK-31687.
         this.executor =
@@ -84,7 +87,7 @@ public class FlinkConnection extends BaseConnection {
 
     @Override
     public DatabaseMetaData getMetaData() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return new FlinkDatabaseMetaData(url, this, createStatement());
     }
 
     @Override
@@ -104,7 +107,9 @@ public class FlinkConnection extends BaseConnection {
     public String getCatalog() throws SQLException {
         try (StatementResult result = executor.executeStatement("SHOW CURRENT 
CATALOG;")) {
             if (result.hasNext()) {
-                return result.next().getString(0).toString();
+                String catalog = result.next().getString(0).toString();
+                checkArgument(!result.hasNext(), "There are more than one 
current catalog.");
+                return catalog;
             } else {
                 throw new SQLException("No catalog");
             }
@@ -158,7 +163,9 @@ public class FlinkConnection extends BaseConnection {
     public String getSchema() throws SQLException {
         try (StatementResult result = executor.executeStatement("SHOW CURRENT 
DATABASE;")) {
             if (result.hasNext()) {
-                return result.next().getString(0).toString();
+                String schema = result.next().getString(0).toString();
+                checkArgument(!result.hasNext(), "There are more than one 
current database.");
+                return schema;
             } else {
                 throw new SQLException("No database");
             }
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java
new file mode 100644
index 00000000000..b729bef5933
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static 
org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. 
*/
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+    private final String url;
+    private final FlinkConnection connection;
+    private final Statement statement;
+    private final Executor executor;
+
+    @VisibleForTesting
+    protected FlinkDatabaseMetaData(String url, FlinkConnection connection, 
Statement statement) {
+        this.url = url;
+        this.connection = connection;
+        this.statement = statement;
+        this.executor = connection.getExecutor();
+    }
+
+    @Override
+    public ResultSet getCatalogs() throws SQLException {
+        try (StatementResult result = catalogs()) {
+            return createCatalogsResultSet(statement, result);
+        } catch (Exception e) {
+            throw new SQLException("Get catalogs fail", e);
+        }
+    }
+
+    private StatementResult catalogs() {
+        return executor.executeStatement("SHOW CATALOGS");
+    }
+
+    @Override
+    public ResultSet getSchemas() throws SQLException {
+        try {
+            String currentCatalog = connection.getCatalog();
+            String currentDatabase = connection.getSchema();
+            List<String> catalogList = new ArrayList<>();
+            Map<String, List<String>> catalogSchemaList = new HashMap<>();
+            try (StatementResult result = catalogs()) {
+                while (result.hasNext()) {
+                    String catalog = result.next().getString(0).toString();
+                    connection.setCatalog(catalog);
+                    getSchemasForCatalog(catalogList, catalogSchemaList, 
catalog, null);
+                }
+            }
+            connection.setCatalog(currentCatalog);
+            connection.setSchema(currentDatabase);
+
+            return createSchemasResultSet(statement, catalogList, 
catalogSchemaList);
+        } catch (Exception e) {
+            throw new SQLException("Get schemas fail", e);
+        }
+    }
+
+    private void getSchemasForCatalog(
+            List<String> catalogList,
+            Map<String, List<String>> catalogSchemaList,
+            String catalog,
+            @Nullable String schemaPattern)
+            throws SQLException {
+        catalogList.add(catalog);
+        List<String> schemas = new ArrayList<>();
+        try (StatementResult schemaResult = schemas()) {
+            while (schemaResult.hasNext()) {
+                String schema = schemaResult.next().getString(0).toString();
+                if (schemaPattern == null || schema.contains(schemaPattern)) {
+                    schemas.add(schema);
+                }
+            }
+        }
+        catalogSchemaList.put(catalog, schemas);
+    }
+
+    private StatementResult schemas() {
+        return executor.executeStatement("SHOW DATABASES;");
+    }
+
+    // TODO Flink will support SHOW DATABASES LIKE statement in FLIP-297, this 
method will be
+    // supported after that issue.
+    @Override
+    public ResultSet getSchemas(String catalog, String schemaPattern) throws 
SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResultSet getTables(
+            String catalog, String schemaPattern, String tableNamePattern, 
String[] types)
+            throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResultSet getColumns(
+            String catalog, String schemaPattern, String tableNamePattern, 
String columnNamePattern)
+            throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResultSet getPrimaryKeys(String catalog, String schema, String 
table)
+            throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResultSet getTableTypes() throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        return connection;
+    }
+
+    @Override
+    public boolean allProceduresAreCallable() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean allTablesAreSelectable() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public String getURL() throws SQLException {
+        return url;
+    }
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public String getDatabaseProductName() throws SQLException {
+        return DriverInfo.DRIVER_NAME;
+    }
+
+    @Override
+    public String getDatabaseProductVersion() throws SQLException {
+        return DriverInfo.DRIVER_VERSION;
+    }
+
+    @Override
+    public String getDriverName() throws SQLException {
+        return FlinkDriver.class.getName();
+    }
+
+    @Override
+    public String getDriverVersion() throws SQLException {
+        return DriverInfo.DRIVER_VERSION;
+    }
+
+    @Override
+    public int getDriverMajorVersion() {
+        return DriverInfo.DRIVER_VERSION_MAJOR;
+    }
+
+    @Override
+    public int getDriverMinorVersion() {
+        return DriverInfo.DRIVER_VERSION_MINOR;
+    }
+
+    @Override
+    public boolean supportsResultSetType(int type) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public int getDatabaseMajorVersion() throws SQLException {
+        return DriverInfo.DRIVER_VERSION_MAJOR;
+    }
+
+    @Override
+    public int getDatabaseMinorVersion() throws SQLException {
+        return DriverInfo.DRIVER_VERSION_MINOR;
+    }
+
+    @Override
+    public int getJDBCMajorVersion() throws SQLException {
+        return DriverInfo.DRIVER_VERSION_MAJOR;
+    }
+
+    @Override
+    public int getJDBCMinorVersion() throws SQLException {
+        return DriverInfo.DRIVER_VERSION_MINOR;
+    }
+
+    @Override
+    public boolean usesLocalFiles() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean usesLocalFilePerTable() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public String getIdentifierQuoteString() throws SQLException {
+        return "`";
+    }
+
+    @Override
+    public String getExtraNameCharacters() throws SQLException {
+        return "";
+    }
+
+    @Override
+    public boolean supportsAlterTableWithAddColumn() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsAlterTableWithDropColumn() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsColumnAliasing() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsTableCorrelationNames() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsDifferentTableCorrelationNames() throws 
SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean supportsExpressionsInOrderBy() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsOrderByUnrelated() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsGroupBy() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsGroupByUnrelated() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsGroupByBeyondSelect() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsLikeEscapeClause() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsNonNullableColumns() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsOuterJoins() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsFullOuterJoins() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsLimitedOuterJoins() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public String getSchemaTerm() throws SQLException {
+        return "database";
+    }
+
+    @Override
+    public String getCatalogTerm() throws SQLException {
+        return "catalog";
+    }
+
+    @Override
+    public boolean supportsSchemasInDataManipulation() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSchemasInTableDefinitions() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsCatalogsInDataManipulation() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsCatalogsInTableDefinitions() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInComparisons() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInExists() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInIns() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInQuantifieds() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsCorrelatedSubqueries() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsUnion() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean supportsUnionAll() throws SQLException {
+        return true;
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index 42bdcee30a4..9bd6b6fe2f8 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.jdbc;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.client.gateway.StatementResult;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.jdbc.utils.CloseableResultIterator;
 import org.apache.flink.table.jdbc.utils.DataConverter;
+import org.apache.flink.table.jdbc.utils.StatementResultIterator;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
@@ -49,7 +51,7 @@ public class FlinkResultSet extends BaseResultSet {
     private final List<DataType> dataTypeList;
     private final List<String> columnNameList;
     private final Statement statement;
-    private final StatementResult result;
+    private final CloseableResultIterator<RowData> iterator;
     private final DataConverter dataConverter;
     private final FlinkResultSetMetaData resultSetMetaData;
     private RowData currentRow;
@@ -59,13 +61,24 @@ public class FlinkResultSet extends BaseResultSet {
 
     public FlinkResultSet(
             Statement statement, StatementResult result, DataConverter 
dataConverter) {
+        this(
+                statement,
+                new StatementResultIterator(result),
+                result.getResultSchema(),
+                dataConverter);
+    }
+
+    public FlinkResultSet(
+            Statement statement,
+            CloseableResultIterator<RowData> iterator,
+            ResolvedSchema schema,
+            DataConverter dataConverter) {
         this.statement = checkNotNull(statement, "Statement cannot be null");
-        this.result = checkNotNull(result, "Statement result cannot be null");
+        this.iterator = checkNotNull(iterator, "Statement result cannot be 
null");
         this.dataConverter = checkNotNull(dataConverter, "Data converter 
cannot be null");
         this.currentRow = null;
         this.wasNull = false;
 
-        final ResolvedSchema schema = result.getResultSchema();
         this.dataTypeList = schema.getColumnDataTypes();
         this.columnNameList = schema.getColumnNames();
         this.resultSetMetaData = new FlinkResultSetMetaData(columnNameList, 
dataTypeList);
@@ -75,9 +88,9 @@ public class FlinkResultSet extends BaseResultSet {
     public boolean next() throws SQLException {
         checkClosed();
 
-        if (result.hasNext()) {
+        if (iterator.hasNext()) {
             // TODO check the kind of currentRow
-            currentRow = result.next();
+            currentRow = iterator.next();
             wasNull = currentRow == null;
             return true;
         } else {
@@ -122,7 +135,11 @@ public class FlinkResultSet extends BaseResultSet {
         }
         closed = true;
 
-        result.close();
+        try {
+            iterator.close();
+        } catch (Exception e) {
+            throw new SQLException("Close result iterator fail", e);
+        }
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
index 0398d1c6806..3ffab4e866c 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
@@ -39,12 +39,12 @@ import java.util.Map;
 public class FlinkResultSetMetaData implements ResultSetMetaData {
     private final List<ColumnInfo> columnList;
 
-    public FlinkResultSetMetaData(List<String> columnNameList, List<DataType> 
columnTypeList) {
-        this.columnList = new ArrayList<>(columnNameList.size());
-        for (int i = 0; i < columnNameList.size(); i++) {
-            this.columnList.add(
+    public FlinkResultSetMetaData(List<String> columnNames, List<DataType> 
columnTypes) {
+        this.columnList = new ArrayList<>(columnNames.size());
+        for (int i = 0; i < columnNames.size(); i++) {
+            columnList.add(
                     ColumnInfo.fromLogicalType(
-                            columnNameList.get(i), 
columnTypeList.get(i).getLogicalType()));
+                            columnNames.get(i), 
columnTypes.get(i).getLogicalType()));
         }
     }
 
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java
new file mode 100644
index 00000000000..d015c2e2bdc
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import java.util.Iterator;
+
+/** Closeable result iterator for jdbc driver. */
+public interface CloseableResultIterator<T> extends Iterator<T>, AutoCloseable 
{}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CollectionResultIterator.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CollectionResultIterator.java
new file mode 100644
index 00000000000..84319e615e5
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CollectionResultIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.Iterator;
+
+/** Result iterator from given iterator. */
+public class CollectionResultIterator implements 
CloseableResultIterator<RowData> {
+    private final Iterator<RowData> iterator;
+
+    public CollectionResultIterator(Iterator<RowData> iterator) {
+        this.iterator = iterator;
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+        return iterator.next();
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
new file mode 100644
index 00000000000..b99bd54a30f
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.FlinkDatabaseMetaData;
+import org.apache.flink.table.jdbc.FlinkResultSet;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to create catalog/schema results for {@link FlinkDatabaseMetaData}. 
*/
+public class DatabaseMetaDataUtils {
+    private static final Column TABLE_CAT_COLUMN =
+            Column.physical("TABLE_CAT", DataTypes.STRING().notNull());
+    private static final Column TABLE_SCHEM_COLUMN =
+            Column.physical("TABLE_SCHEM", DataTypes.STRING().notNull());
+    private static final Column TABLE_CATALOG_COLUMN =
+            Column.physical("TABLE_CATALOG", DataTypes.STRING());
+
+    /**
+     * Create result set for catalogs. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_CAT String => catalog name.
+     * </ul>
+     *
+     * <p>The results are ordered by catalog name.
+     *
+     * @param statement The statement for database meta data
+     * @param result The result for catalogs
+     * @return a ResultSet object in which each row has a single String column 
that is a catalog
+     *     name
+     */
+    public static FlinkResultSet createCatalogsResultSet(
+            Statement statement, StatementResult result) {
+        List<RowData> catalogs = new ArrayList<>();
+        result.forEachRemaining(catalogs::add);
+        catalogs.sort(Comparator.comparing(v -> v.getString(0)));
+
+        return new FlinkResultSet(
+                statement,
+                new CollectionResultIterator(catalogs.iterator()),
+                ResolvedSchema.of(TABLE_CAT_COLUMN),
+                StringDataConverter.CONVERTER);
+    }
+
+    /**
+     * Create result set for schemas. The schema columns are:
+     *
+     * <ul>
+     *   <li>TABLE_SCHEM String => schema name
+     *   <li>TABLE_CATALOG String => catalog name (may be null)
+     * </ul>
+     *
+     * <p>The results are ordered by TABLE_CATALOG and TABLE_SCHEM.
+     *
+     * @param statement The statement for database meta data
+     * @param catalogs The catalog list
+     * @param catalogSchemas The catalog with schema list
+     * @return a ResultSet object in which each row is a schema description
+     */
+    public static FlinkResultSet createSchemasResultSet(
+            Statement statement, List<String> catalogs, Map<String, 
List<String>> catalogSchemas) {
+        List<RowData> schemaWithCatalogList = new ArrayList<>();
+        List<String> catalogList = new ArrayList<>(catalogs);
+        catalogList.sort(String::compareTo);
+        for (String catalog : catalogList) {
+            List<String> schemas = catalogSchemas.get(catalog);
+            schemas.sort(String::compareTo);
+            schemas.forEach(
+                    s ->
+                            schemaWithCatalogList.add(
+                                    GenericRowData.of(
+                                            StringData.fromString(s),
+                                            StringData.fromString(catalog))));
+        }
+
+        return new FlinkResultSet(
+                statement,
+                new CollectionResultIterator(schemaWithCatalogList.iterator()),
+                ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN),
+                StringDataConverter.CONVERTER);
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StatementResultIterator.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StatementResultIterator.java
new file mode 100644
index 00000000000..bfc41f9ec38
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StatementResultIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.RowData;
+
+/** Closeable result iterator for statement result. */
+public class StatementResultIterator implements 
CloseableResultIterator<RowData> {
+    private final StatementResult result;
+
+    public StatementResultIterator(StatementResult result) {
+        this.result = result;
+    }
+
+    @Override
+    public void close() throws Exception {
+        result.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return result.hasNext();
+    }
+
+    @Override
+    public RowData next() {
+        return result.next();
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
index 9311175d17d..08a6faa873d 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
@@ -73,9 +73,8 @@ public class FlinkConnectionTest extends 
FlinkJdbcDriverTestBase {
     public void testClientInfo() throws Exception {
         Properties properties = new Properties();
         properties.setProperty("key3", "val3");
-        try (FlinkConnection connection =
-                new FlinkConnection(
-                        getDriverUri("jdbc:flink://%s:%s?key1=val1&key2=val2", 
properties))) {
+        DriverUri driverUri = 
getDriverUri("jdbc:flink://%s:%s?key1=val1&key2=val2", properties);
+        try (FlinkConnection connection = new FlinkConnection(driverUri)) {
             assertEquals("val1", connection.getClientInfo("key1"));
             assertEquals("val2", connection.getClientInfo("key2"));
             assertEquals("val3", connection.getClientInfo("key3"));
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java
new file mode 100644
index 00000000000..1d469108814
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for flink database metadata. */
+public class FlinkDatabaseMetaDataTest extends FlinkJdbcDriverTestBase {
+    @Test
+    public void testCatalogSchemas() throws Exception {
+
+        DriverUri driverUri = getDriverUri();
+        try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+            Executor executor = connection.getExecutor();
+            // create databases in default catalog
+            executeDDL("CREATE DATABASE database11", executor);
+            executeDDL("CREATE DATABASE database12", executor);
+            executeDDL("CREATE DATABASE database13", executor);
+
+            // create catalog2 and databases
+            executeDDL("CREATE CATALOG test_catalog2 WITH 
('type'='generic_in_memory');", executor);
+            executeDDL("CREATE DATABASE test_catalog2.database11", executor);
+            executeDDL("CREATE DATABASE test_catalog2.database21", executor);
+            executeDDL("CREATE DATABASE test_catalog2.database31", executor);
+
+            // create catalog1 and databases
+            executeDDL("CREATE CATALOG test_catalog1 WITH 
('type'='generic_in_memory');", executor);
+            executeDDL("CREATE DATABASE test_catalog1.database11", executor);
+            executeDDL("CREATE DATABASE test_catalog1.database21", executor);
+            executeDDL("CREATE DATABASE test_catalog1.database13", executor);
+
+            connection.setCatalog("test_catalog2");
+            connection.setSchema("database21");
+            assertEquals("test_catalog2", connection.getCatalog());
+            assertEquals("database21", connection.getSchema());
+
+            DatabaseMetaData databaseMetaData =
+                    new FlinkDatabaseMetaData(
+                            driverUri.getURL(), connection, new 
TestingStatement());
+            // Show all catalogs
+            assertThat(resultSetToListAndClose(databaseMetaData.getCatalogs()))
+                    .containsExactly("default_catalog", "test_catalog1", 
"test_catalog2");
+            // Show all databases
+            assertThat(resultSetToListAndClose(databaseMetaData.getSchemas()))
+                    .containsExactly(
+                            "database11,default_catalog",
+                            "database12,default_catalog",
+                            "database13,default_catalog",
+                            "default_database,default_catalog",
+                            "database11,test_catalog1",
+                            "database13,test_catalog1",
+                            "database21,test_catalog1",
+                            "default,test_catalog1",
+                            "database11,test_catalog2",
+                            "database21,test_catalog2",
+                            "database31,test_catalog2",
+                            "default,test_catalog2");
+
+            // Validate that the default catalog and database are not changed.
+            assertEquals("test_catalog2", connection.getCatalog());
+            assertEquals("database21", connection.getSchema());
+        }
+    }
+
+    private List<String> resultSetToListAndClose(ResultSet resultSet) throws 
Exception {
+        List<String> resultList = new ArrayList<>();
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        while (resultSet.next()) {
+            List<String> columnStringList = new ArrayList<>(columnCount);
+            for (int i = 1; i <= columnCount; i++) {
+                columnStringList.add(resultSet.getString(i));
+            }
+            resultList.add(StringUtils.join(columnStringList, ","));
+        }
+        resultSet.close();
+
+        return resultList;
+    }
+
+    private void executeDDL(String sql, Executor executor) {
+        try (StatementResult result = executor.executeStatement(sql)) {
+            assertTrue(result.hasNext());
+            RowData rowData = result.next();
+            assertEquals(1, rowData.getArity());
+            assertEquals("OK", rowData.getString(0).toString());
+
+            assertFalse(result.hasNext());
+        }
+    }
+}

Reply via email to