This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for
jdbc driver
2ff4e220945 is described below
commit 2ff4e220945680eba21065480385148ca8d034da
Author: Shammon FY <[email protected]>
AuthorDate: Thu Apr 6 13:09:19 2023 +0800
[FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver
Close apache/flink#22362
---
.../flink/table/jdbc/BaseDatabaseMetaData.java | 792 +++++++++++++++++++++
.../org/apache/flink/table/jdbc/DriverUri.java | 4 +
.../apache/flink/table/jdbc/FlinkConnection.java | 15 +-
.../flink/table/jdbc/FlinkDatabaseMetaData.java | 384 ++++++++++
.../apache/flink/table/jdbc/FlinkResultSet.java | 29 +-
.../flink/table/jdbc/FlinkResultSetMetaData.java | 10 +-
.../table/jdbc/utils/CloseableResultIterator.java | 24 +
.../table/jdbc/utils/CollectionResultIterator.java | 45 ++
.../table/jdbc/utils/DatabaseMetaDataUtils.java | 110 +++
.../table/jdbc/utils/StatementResultIterator.java | 46 ++
.../flink/table/jdbc/FlinkConnectionTest.java | 5 +-
.../table/jdbc/FlinkDatabaseMetaDataTest.java | 121 ++++
12 files changed, 1567 insertions(+), 18 deletions(-)
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
new file mode 100644
index 00000000000..75ec33aab67
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.RowIdLifetime;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+/** Base {@link DatabaseMetaData} for flink driver with not supported
features. */
+public abstract class BaseDatabaseMetaData implements DatabaseMetaData {
+ @Override
+ public String getUserName() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getUserName is not supported");
+ }
+
+ @Override
+ public boolean nullsAreSortedHigh() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#nullsAreSortedHigh is not supported");
+ }
+
+ @Override
+ public boolean nullsAreSortedLow() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#nullsAreSortedLow is not supported");
+ }
+
+ @Override
+ public boolean nullsAreSortedAtStart() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#nullsAreSortedAtStart is not
supported");
+ }
+
+ @Override
+ public boolean nullsAreSortedAtEnd() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#nullsAreSortedAtEnd is not supported");
+ }
+
+ @Override
+ public boolean supportsMixedCaseIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsMixedCaseIdentifiers is not
supported");
+ }
+
+ @Override
+ public boolean storesUpperCaseIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#storesUpperCaseIdentifiers is not
supported");
+ }
+
+ @Override
+ public boolean storesLowerCaseIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#storesLowerCaseIdentifiers is not
supported");
+ }
+
+ @Override
+ public boolean storesMixedCaseIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#storesMixedCaseIdentifiers is not
supported");
+ }
+
+ @Override
+ public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsMixedCaseQuotedIdentifiers is
not supported");
+ }
+
+ @Override
+ public boolean storesUpperCaseQuotedIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#storesUpperCaseQuotedIdentifiers is not
supported");
+ }
+
+ @Override
+ public boolean storesLowerCaseQuotedIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#storesLowerCaseQuotedIdentifiers is not
supported");
+ }
+
+ @Override
+ public boolean storesMixedCaseQuotedIdentifiers() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#storesMixedCaseQuotedIdentifiers is not
supported");
+ }
+
+ @Override
+ public String getSQLKeywords() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getSQLKeywords is not supported");
+ }
+
+ @Override
+ public String getNumericFunctions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getNumericFunctions is not supported");
+ }
+
+ @Override
+ public String getStringFunctions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getStringFunctions is not supported");
+ }
+
+ @Override
+ public String getSystemFunctions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getSystemFunctions is not supported");
+ }
+
+ @Override
+ public String getTimeDateFunctions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getTimeDateFunctions is not supported");
+ }
+
+ @Override
+ public String getSearchStringEscape() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getSearchStringEscape is not
supported");
+ }
+
+ @Override
+ public boolean nullPlusNonNullIsNull() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#nullPlusNonNullIsNull is not
supported");
+ }
+
+ @Override
+ public boolean supportsConvert() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsConvert is not supported");
+ }
+
+ @Override
+ public boolean supportsConvert(int fromType, int toType) throws
SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsConvert is not supported");
+ }
+
+ @Override
+ public boolean supportsMultipleResultSets() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsMultipleResultSets is not
supported");
+ }
+
+ @Override
+ public boolean supportsMultipleTransactions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsMultipleTransactions is not
supported");
+ }
+
+ @Override
+ public boolean supportsMinimumSQLGrammar() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsMinimumSQLGrammar is not
supported");
+ }
+
+ @Override
+ public boolean supportsCoreSQLGrammar() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsCoreSQLGrammar is not
supported");
+ }
+
+ @Override
+ public boolean supportsExtendedSQLGrammar() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsExtendedSQLGrammar is not
supported");
+ }
+
+ @Override
+ public boolean supportsANSI92EntryLevelSQL() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsANSI92EntryLevelSQL is not
supported");
+ }
+
+ @Override
+ public boolean supportsANSI92IntermediateSQL() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsANSI92IntermediateSQL is not
supported");
+ }
+
+ @Override
+ public boolean supportsANSI92FullSQL() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsANSI92FullSQL is not
supported");
+ }
+
+ @Override
+ public boolean supportsIntegrityEnhancementFacility() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsIntegrityEnhancementFacility is
not supported");
+ }
+
+ @Override
+ public String getProcedureTerm() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getProcedureTerm is not supported");
+ }
+
+ @Override
+ public boolean isCatalogAtStart() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#isCatalogAtStart is not supported");
+ }
+
+ @Override
+ public String getCatalogSeparator() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getCatalogSeparator is not supported");
+ }
+
+ @Override
+ public boolean supportsSchemasInProcedureCalls() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsSchemasInProcedureCalls is not
supported");
+ }
+
+ @Override
+ public boolean supportsSchemasInIndexDefinitions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsSchemasInIndexDefinitions is
not supported");
+ }
+
+ @Override
+ public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException
{
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsSchemasInPrivilegeDefinitions
is not supported");
+ }
+
+ @Override
+ public boolean supportsCatalogsInProcedureCalls() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsCatalogsInProcedureCalls is not
supported");
+ }
+
+ @Override
+ public boolean supportsCatalogsInIndexDefinitions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsCatalogsInIndexDefinitions is
not supported");
+ }
+
+ @Override
+ public boolean supportsCatalogsInPrivilegeDefinitions() throws
SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsCatalogsInPrivilegeDefinitions
is not supported");
+ }
+
+ @Override
+ public boolean supportsPositionedDelete() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsPositionedDelete is not
supported");
+ }
+
+ @Override
+ public boolean supportsPositionedUpdate() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsPositionedUpdate is not
supported");
+ }
+
+ @Override
+ public boolean supportsSelectForUpdate() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsSelectForUpdate is not
supported");
+ }
+
+ @Override
+ public boolean supportsStoredProcedures() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsStoredProcedures is not
supported");
+ }
+
+ @Override
+ public boolean supportsOpenCursorsAcrossCommit() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsOpenCursorsAcrossCommit is not
supported");
+ }
+
+ @Override
+ public boolean supportsOpenCursorsAcrossRollback() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsOpenCursorsAcrossRollback is
not supported");
+ }
+
+ @Override
+ public boolean supportsOpenStatementsAcrossCommit() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsOpenStatementsAcrossCommit is
not supported");
+ }
+
+ @Override
+ public boolean supportsOpenStatementsAcrossRollback() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsOpenStatementsAcrossRollback is
not supported");
+ }
+
+ @Override
+ public int getMaxBinaryLiteralLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxBinaryLiteralLength is not
supported");
+ }
+
+ @Override
+ public int getMaxCharLiteralLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxCharLiteralLength is not
supported");
+ }
+
+ @Override
+ public int getMaxColumnNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxColumnNameLength is not
supported");
+ }
+
+ @Override
+ public int getMaxColumnsInGroupBy() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxColumnsInGroupBy is not
supported");
+ }
+
+ @Override
+ public int getMaxColumnsInIndex() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxColumnsInIndex is not supported");
+ }
+
+ @Override
+ public int getMaxColumnsInOrderBy() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxColumnsInOrderBy is not
supported");
+ }
+
+ @Override
+ public int getMaxColumnsInSelect() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxColumnsInSelect is not
supported");
+ }
+
+ @Override
+ public int getMaxColumnsInTable() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxColumnsInTable is not supported");
+ }
+
+ @Override
+ public int getMaxConnections() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxConnections is not supported");
+ }
+
+ @Override
+ public int getMaxCursorNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxCursorNameLength is not
supported");
+ }
+
+ @Override
+ public int getMaxIndexLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxIndexLength is not supported");
+ }
+
+ @Override
+ public int getMaxSchemaNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxSchemaNameLength is not
supported");
+ }
+
+ @Override
+ public int getMaxProcedureNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxProcedureNameLength is not
supported");
+ }
+
+ @Override
+ public int getMaxCatalogNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxCatalogNameLength is not
supported");
+ }
+
+ @Override
+ public int getMaxRowSize() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxRowSize is not supported");
+ }
+
+ @Override
+ public boolean doesMaxRowSizeIncludeBlobs() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#doesMaxRowSizeIncludeBlobs is not
supported");
+ }
+
+ @Override
+ public int getMaxStatementLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxStatementLength is not
supported");
+ }
+
+ @Override
+ public int getMaxStatements() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxStatements is not supported");
+ }
+
+ @Override
+ public int getMaxTableNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxTableNameLength is not
supported");
+ }
+
+ @Override
+ public int getMaxTablesInSelect() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxTablesInSelect is not supported");
+ }
+
+ @Override
+ public int getMaxUserNameLength() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getMaxUserNameLength is not supported");
+ }
+
+ @Override
+ public int getDefaultTransactionIsolation() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getDefaultTransactionIsolation is not
supported");
+ }
+
+ @Override
+ public boolean supportsTransactions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsTransactions is not supported");
+ }
+
+ @Override
+ public boolean supportsTransactionIsolationLevel(int level) throws
SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsTransactionIsolationLevel is
not supported");
+ }
+
+ @Override
+ public boolean supportsDataDefinitionAndDataManipulationTransactions()
throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+
"FlinkDatabaseMetaData#supportsDataDefinitionAndDataManipulationTransactions is
not supported");
+ }
+
+ @Override
+ public boolean supportsDataManipulationTransactionsOnly() throws
SQLException {
+ throw new SQLFeatureNotSupportedException(
+
"FlinkDatabaseMetaData#supportsDataManipulationTransactionsOnly is not
supported");
+ }
+
+ @Override
+ public boolean dataDefinitionCausesTransactionCommit() throws SQLException
{
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#dataDefinitionCausesTransactionCommit
is not supported");
+ }
+
+ @Override
+ public boolean dataDefinitionIgnoredInTransactions() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#dataDefinitionIgnoredInTransactions is
not supported");
+ }
+
+ @Override
+ public ResultSet getProcedures(
+ String catalog, String schemaPattern, String procedureNamePattern)
throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getProcedures is not supported");
+ }
+
+ @Override
+ public ResultSet getProcedureColumns(
+ String catalog,
+ String schemaPattern,
+ String procedureNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getProcedureColumns is not supported");
+ }
+
+ @Override
+ public ResultSet getColumnPrivileges(
+ String catalog, String schema, String table, String
columnNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getColumnPrivileges is not supported");
+ }
+
+ @Override
+ public ResultSet getTablePrivileges(
+ String catalog, String schemaPattern, String tableNamePattern)
throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getTablePrivileges is not supported");
+ }
+
+ @Override
+ public ResultSet getBestRowIdentifier(
+ String catalog, String schema, String table, int scope, boolean
nullable)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getBestRowIdentifier is not supported");
+ }
+
+ @Override
+ public ResultSet getVersionColumns(String catalog, String schema, String
table)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getVersionColumns is not supported");
+ }
+
+ @Override
+ public ResultSet getImportedKeys(String catalog, String schema, String
table)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getImportedKeys is not supported");
+ }
+
+ @Override
+ public ResultSet getExportedKeys(String catalog, String schema, String
table)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getExportedKeys is not supported");
+ }
+
+ @Override
+ public ResultSet getCrossReference(
+ String parentCatalog,
+ String parentSchema,
+ String parentTable,
+ String foreignCatalog,
+ String foreignSchema,
+ String foreignTable)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getCrossReference is not supported");
+ }
+
+ @Override
+ public ResultSet getTypeInfo() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getTypeInfo is not supported");
+ }
+
+ @Override
+ public ResultSet getIndexInfo(
+ String catalog, String schema, String table, boolean unique,
boolean approximate)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getIndexInfo is not supported");
+ }
+
+ @Override
+ public boolean supportsResultSetConcurrency(int type, int concurrency)
throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsResultSetConcurrency is not
supported");
+ }
+
+ @Override
+ public boolean ownUpdatesAreVisible(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#ownUpdatesAreVisible is not supported");
+ }
+
+ @Override
+ public boolean ownDeletesAreVisible(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#ownDeletesAreVisible is not supported");
+ }
+
+ @Override
+ public boolean ownInsertsAreVisible(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#ownInsertsAreVisible is not supported");
+ }
+
+ @Override
+ public boolean othersUpdatesAreVisible(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#othersUpdatesAreVisible is not
supported");
+ }
+
+ @Override
+ public boolean othersDeletesAreVisible(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#othersDeletesAreVisible is not
supported");
+ }
+
+ @Override
+ public boolean othersInsertsAreVisible(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#othersInsertsAreVisible is not
supported");
+ }
+
+ @Override
+ public boolean updatesAreDetected(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#updatesAreDetected is not supported");
+ }
+
+ @Override
+ public boolean deletesAreDetected(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#deletesAreDetected is not supported");
+ }
+
+ @Override
+ public boolean insertsAreDetected(int type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#insertsAreDetected is not supported");
+ }
+
+ @Override
+ public boolean supportsBatchUpdates() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsBatchUpdates is not supported");
+ }
+
+ @Override
+ public ResultSet getUDTs(
+ String catalog, String schemaPattern, String typeNamePattern,
int[] types)
+ throws SQLException {
+ throw new
SQLFeatureNotSupportedException("FlinkDatabaseMetaData#getUDTs is not
supported");
+ }
+
+ @Override
+ public boolean supportsSavepoints() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsSavepoints is not supported");
+ }
+
+ @Override
+ public boolean supportsNamedParameters() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsNamedParameters is not
supported");
+ }
+
+ @Override
+ public boolean supportsMultipleOpenResults() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsMultipleOpenResults is not
supported");
+ }
+
+ @Override
+ public boolean supportsGetGeneratedKeys() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsGetGeneratedKeys is not
supported");
+ }
+
+ @Override
+ public ResultSet getSuperTypes(String catalog, String schemaPattern,
String typeNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getSuperTypes is not supported");
+ }
+
+ @Override
+ public ResultSet getSuperTables(String catalog, String schemaPattern,
String tableNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getSuperTypes is not supported");
+ }
+
+ @Override
+ public ResultSet getAttributes(
+ String catalog,
+ String schemaPattern,
+ String typeNamePattern,
+ String attributeNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getAttributes is not supported");
+ }
+
+ @Override
+ public boolean supportsResultSetHoldability(int holdability) throws
SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsResultSetHoldability is not
supported");
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getResultSetHoldability is not
supported");
+ }
+
+ @Override
+ public int getSQLStateType() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getSQLStateType is not supported");
+ }
+
+ @Override
+ public boolean locatorsUpdateCopy() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#locatorsUpdateCopy is not supported");
+ }
+
+ @Override
+ public boolean supportsStatementPooling() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsStatementPooling is not
supported");
+ }
+
+ @Override
+ public RowIdLifetime getRowIdLifetime() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getRowIdLifetime is not supported");
+ }
+
+ @Override
+ public boolean supportsStoredFunctionsUsingCallSyntax() throws
SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#supportsStoredFunctionsUsingCallSyntax
is not supported");
+ }
+
+ @Override
+ public boolean autoCommitFailureClosesAllResultSets() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#autoCommitFailureClosesAllResultSets is
not supported");
+ }
+
+ @Override
+ public ResultSet getClientInfoProperties() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getClientInfoProperties is not
supported");
+ }
+
+ @Override
+ public ResultSet getFunctions(String catalog, String schemaPattern, String
functionNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getFunctions is not supported");
+ }
+
+ @Override
+ public ResultSet getFunctionColumns(
+ String catalog,
+ String schemaPattern,
+ String functionNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getFunctionColumns is not supported");
+ }
+
+ @Override
+ public ResultSet getPseudoColumns(
+ String catalog, String schemaPattern, String tableNamePattern,
String columnNamePattern)
+ throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#getPseudoColumns is not supported");
+ }
+
+ @Override
+ public boolean generatedKeyAlwaysReturned() throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#generatedKeyAlwaysReturned is not
supported");
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ throw new
SQLFeatureNotSupportedException("FlinkDatabaseMetaData#unwrap is not
supported");
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new SQLFeatureNotSupportedException(
+ "FlinkDatabaseMetaData#isWrapperFor is not supported");
+ }
+}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
index a13650b45e7..cd89b4fc3fb 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
@@ -75,6 +75,10 @@ public class DriverUri {
return database;
}
+ public String getURL() {
+ return String.format("%s%s", URL_PREFIX, uri);
+ }
+
private void initCatalogAndSchema() throws SQLException {
String path = uri.getPath();
if (isNullOrWhitespaceOnly(uri.getPath()) || path.equals("/")) {
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
index 7efe7de419f..076172659ff 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
@@ -28,18 +28,21 @@ import org.apache.flink.table.jdbc.utils.DriverUtils;
import java.sql.DatabaseMetaData;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
+import static org.apache.flink.table.jdbc.utils.DriverUtils.checkArgument;
+
/** Connection to flink sql gateway for jdbc driver. */
public class FlinkConnection extends BaseConnection {
+ private final String url;
private final Executor executor;
private volatile boolean closed = false;
public FlinkConnection(DriverUri driverUri) {
+ this.url = driverUri.getURL();
// TODO Support default context from map to get gid of flink core for
jdbc driver in
// https://issues.apache.org/jira/browse/FLINK-31687.
this.executor =
@@ -84,7 +87,7 @@ public class FlinkConnection extends BaseConnection {
@Override
public DatabaseMetaData getMetaData() throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return new FlinkDatabaseMetaData(url, this, createStatement());
}
@Override
@@ -104,7 +107,9 @@ public class FlinkConnection extends BaseConnection {
public String getCatalog() throws SQLException {
try (StatementResult result = executor.executeStatement("SHOW CURRENT
CATALOG;")) {
if (result.hasNext()) {
- return result.next().getString(0).toString();
+ String catalog = result.next().getString(0).toString();
+ checkArgument(!result.hasNext(), "There are more than one
current catalog.");
+ return catalog;
} else {
throw new SQLException("No catalog");
}
@@ -158,7 +163,9 @@ public class FlinkConnection extends BaseConnection {
public String getSchema() throws SQLException {
try (StatementResult result = executor.executeStatement("SHOW CURRENT
DATABASE;")) {
if (result.hasNext()) {
- return result.next().getString(0).toString();
+ String schema = result.next().getString(0).toString();
+ checkArgument(!result.hasNext(), "There are more than one
current database.");
+ return schema;
} else {
throw new SQLException("No database");
}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java
new file mode 100644
index 00000000000..b729bef5933
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
+import static
org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
+
+/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver.
*/
+public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
+ private final String url;
+ private final FlinkConnection connection;
+ private final Statement statement;
+ private final Executor executor;
+
+ @VisibleForTesting
+ protected FlinkDatabaseMetaData(String url, FlinkConnection connection,
Statement statement) {
+ this.url = url;
+ this.connection = connection;
+ this.statement = statement;
+ this.executor = connection.getExecutor();
+ }
+
+ @Override
+ public ResultSet getCatalogs() throws SQLException {
+ try (StatementResult result = catalogs()) {
+ return createCatalogsResultSet(statement, result);
+ } catch (Exception e) {
+ throw new SQLException("Get catalogs fail", e);
+ }
+ }
+
+ private StatementResult catalogs() {
+ return executor.executeStatement("SHOW CATALOGS");
+ }
+
+ @Override
+ public ResultSet getSchemas() throws SQLException {
+ try {
+ String currentCatalog = connection.getCatalog();
+ String currentDatabase = connection.getSchema();
+ List<String> catalogList = new ArrayList<>();
+ Map<String, List<String>> catalogSchemaList = new HashMap<>();
+ try (StatementResult result = catalogs()) {
+ while (result.hasNext()) {
+ String catalog = result.next().getString(0).toString();
+ connection.setCatalog(catalog);
+ getSchemasForCatalog(catalogList, catalogSchemaList,
catalog, null);
+ }
+ }
+ connection.setCatalog(currentCatalog);
+ connection.setSchema(currentDatabase);
+
+ return createSchemasResultSet(statement, catalogList,
catalogSchemaList);
+ } catch (Exception e) {
+ throw new SQLException("Get schemas fail", e);
+ }
+ }
+
+ private void getSchemasForCatalog(
+ List<String> catalogList,
+ Map<String, List<String>> catalogSchemaList,
+ String catalog,
+ @Nullable String schemaPattern)
+ throws SQLException {
+ catalogList.add(catalog);
+ List<String> schemas = new ArrayList<>();
+ try (StatementResult schemaResult = schemas()) {
+ while (schemaResult.hasNext()) {
+ String schema = schemaResult.next().getString(0).toString();
+ if (schemaPattern == null || schema.contains(schemaPattern)) {
+ schemas.add(schema);
+ }
+ }
+ }
+ catalogSchemaList.put(catalog, schemas);
+ }
+
+ private StatementResult schemas() {
+ return executor.executeStatement("SHOW DATABASES;");
+ }
+
+ // TODO Flink will support SHOW DATABASES LIKE statement in FLIP-297, this
method will be
+ // supported after that issue.
+ @Override
+ public ResultSet getSchemas(String catalog, String schemaPattern) throws
SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getTables(
+ String catalog, String schemaPattern, String tableNamePattern,
String[] types)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getColumns(
+ String catalog, String schemaPattern, String tableNamePattern,
String columnNamePattern)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getPrimaryKeys(String catalog, String schema, String
table)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getTableTypes() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return connection;
+ }
+
+ @Override
+ public boolean allProceduresAreCallable() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean allTablesAreSelectable() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public String getURL() throws SQLException {
+ return url;
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public String getDatabaseProductName() throws SQLException {
+ return DriverInfo.DRIVER_NAME;
+ }
+
+ @Override
+ public String getDatabaseProductVersion() throws SQLException {
+ return DriverInfo.DRIVER_VERSION;
+ }
+
+ @Override
+ public String getDriverName() throws SQLException {
+ return FlinkDriver.class.getName();
+ }
+
+ @Override
+ public String getDriverVersion() throws SQLException {
+ return DriverInfo.DRIVER_VERSION;
+ }
+
+ @Override
+ public int getDriverMajorVersion() {
+ return DriverInfo.DRIVER_VERSION_MAJOR;
+ }
+
+ @Override
+ public int getDriverMinorVersion() {
+ return DriverInfo.DRIVER_VERSION_MINOR;
+ }
+
+ @Override
+ public boolean supportsResultSetType(int type) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public int getDatabaseMajorVersion() throws SQLException {
+ return DriverInfo.DRIVER_VERSION_MAJOR;
+ }
+
+ @Override
+ public int getDatabaseMinorVersion() throws SQLException {
+ return DriverInfo.DRIVER_VERSION_MINOR;
+ }
+
+ @Override
+ public int getJDBCMajorVersion() throws SQLException {
+ return DriverInfo.DRIVER_VERSION_MAJOR;
+ }
+
+ @Override
+ public int getJDBCMinorVersion() throws SQLException {
+ return DriverInfo.DRIVER_VERSION_MINOR;
+ }
+
+ @Override
+ public boolean usesLocalFiles() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean usesLocalFilePerTable() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public String getIdentifierQuoteString() throws SQLException {
+ return "`";
+ }
+
+ @Override
+ public String getExtraNameCharacters() throws SQLException {
+ return "";
+ }
+
+ @Override
+ public boolean supportsAlterTableWithAddColumn() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsAlterTableWithDropColumn() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsColumnAliasing() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsTableCorrelationNames() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsDifferentTableCorrelationNames() throws
SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean supportsExpressionsInOrderBy() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsOrderByUnrelated() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsGroupBy() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsGroupByUnrelated() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsGroupByBeyondSelect() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsLikeEscapeClause() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsNonNullableColumns() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsOuterJoins() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsFullOuterJoins() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsLimitedOuterJoins() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public String getSchemaTerm() throws SQLException {
+ return "database";
+ }
+
+ @Override
+ public String getCatalogTerm() throws SQLException {
+ return "catalog";
+ }
+
+ @Override
+ public boolean supportsSchemasInDataManipulation() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSchemasInTableDefinitions() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsCatalogsInDataManipulation() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsCatalogsInTableDefinitions() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSubqueriesInComparisons() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSubqueriesInExists() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSubqueriesInIns() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSubqueriesInQuantifieds() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsCorrelatedSubqueries() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsUnion() throws SQLException {
+ return true;
+ }
+
+ @Override
+ public boolean supportsUnionAll() throws SQLException {
+ return true;
+ }
+}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index 42bdcee30a4..9bd6b6fe2f8 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.jdbc;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.client.gateway.StatementResult;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.jdbc.utils.CloseableResultIterator;
import org.apache.flink.table.jdbc.utils.DataConverter;
+import org.apache.flink.table.jdbc.utils.StatementResultIterator;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
@@ -49,7 +51,7 @@ public class FlinkResultSet extends BaseResultSet {
private final List<DataType> dataTypeList;
private final List<String> columnNameList;
private final Statement statement;
- private final StatementResult result;
+ private final CloseableResultIterator<RowData> iterator;
private final DataConverter dataConverter;
private final FlinkResultSetMetaData resultSetMetaData;
private RowData currentRow;
@@ -59,13 +61,24 @@ public class FlinkResultSet extends BaseResultSet {
public FlinkResultSet(
Statement statement, StatementResult result, DataConverter
dataConverter) {
+ this(
+ statement,
+ new StatementResultIterator(result),
+ result.getResultSchema(),
+ dataConverter);
+ }
+
+ public FlinkResultSet(
+ Statement statement,
+ CloseableResultIterator<RowData> iterator,
+ ResolvedSchema schema,
+ DataConverter dataConverter) {
this.statement = checkNotNull(statement, "Statement cannot be null");
- this.result = checkNotNull(result, "Statement result cannot be null");
+ this.iterator = checkNotNull(iterator, "Statement result cannot be
null");
this.dataConverter = checkNotNull(dataConverter, "Data converter
cannot be null");
this.currentRow = null;
this.wasNull = false;
- final ResolvedSchema schema = result.getResultSchema();
this.dataTypeList = schema.getColumnDataTypes();
this.columnNameList = schema.getColumnNames();
this.resultSetMetaData = new FlinkResultSetMetaData(columnNameList,
dataTypeList);
@@ -75,9 +88,9 @@ public class FlinkResultSet extends BaseResultSet {
public boolean next() throws SQLException {
checkClosed();
- if (result.hasNext()) {
+ if (iterator.hasNext()) {
// TODO check the kind of currentRow
- currentRow = result.next();
+ currentRow = iterator.next();
wasNull = currentRow == null;
return true;
} else {
@@ -122,7 +135,11 @@ public class FlinkResultSet extends BaseResultSet {
}
closed = true;
- result.close();
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ throw new SQLException("Close result iterator fail", e);
+ }
}
@Override
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
index 0398d1c6806..3ffab4e866c 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
@@ -39,12 +39,12 @@ import java.util.Map;
public class FlinkResultSetMetaData implements ResultSetMetaData {
private final List<ColumnInfo> columnList;
- public FlinkResultSetMetaData(List<String> columnNameList, List<DataType>
columnTypeList) {
- this.columnList = new ArrayList<>(columnNameList.size());
- for (int i = 0; i < columnNameList.size(); i++) {
- this.columnList.add(
+ public FlinkResultSetMetaData(List<String> columnNames, List<DataType>
columnTypes) {
+ this.columnList = new ArrayList<>(columnNames.size());
+ for (int i = 0; i < columnNames.size(); i++) {
+ columnList.add(
ColumnInfo.fromLogicalType(
- columnNameList.get(i),
columnTypeList.get(i).getLogicalType()));
+ columnNames.get(i),
columnTypes.get(i).getLogicalType()));
}
}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java
new file mode 100644
index 00000000000..d015c2e2bdc
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CloseableResultIterator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import java.util.Iterator;
+
+/** Closeable result iterator for jdbc driver. */
+public interface CloseableResultIterator<T> extends Iterator<T>, AutoCloseable
{}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CollectionResultIterator.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CollectionResultIterator.java
new file mode 100644
index 00000000000..84319e615e5
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/CollectionResultIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.Iterator;
+
+/** Result iterator from given iterator. */
+public class CollectionResultIterator implements
CloseableResultIterator<RowData> {
+ private final Iterator<RowData> iterator;
+
+ public CollectionResultIterator(Iterator<RowData> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ return iterator.next();
+ }
+}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
new file mode 100644
index 00000000000..b99bd54a30f
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.FlinkDatabaseMetaData;
+import org.apache.flink.table.jdbc.FlinkResultSet;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to create catalog/schema results for {@link FlinkDatabaseMetaData}.
*/
+public class DatabaseMetaDataUtils {
+ private static final Column TABLE_CAT_COLUMN =
+ Column.physical("TABLE_CAT", DataTypes.STRING().notNull());
+ private static final Column TABLE_SCHEM_COLUMN =
+ Column.physical("TABLE_SCHEM", DataTypes.STRING().notNull());
+ private static final Column TABLE_CATALOG_COLUMN =
+ Column.physical("TABLE_CATALOG", DataTypes.STRING());
+
+ /**
+ * Create result set for catalogs. The schema columns are:
+ *
+ * <ul>
+ * <li>TABLE_CAT String => catalog name.
+ * </ul>
+ *
+ * <p>The results are ordered by catalog name.
+ *
+ * @param statement The statement for database meta data
+ * @param result The result for catalogs
+ * @return a ResultSet object in which each row has a single String column
that is a catalog
+ * name
+ */
+ public static FlinkResultSet createCatalogsResultSet(
+ Statement statement, StatementResult result) {
+ List<RowData> catalogs = new ArrayList<>();
+ result.forEachRemaining(catalogs::add);
+ catalogs.sort(Comparator.comparing(v -> v.getString(0)));
+
+ return new FlinkResultSet(
+ statement,
+ new CollectionResultIterator(catalogs.iterator()),
+ ResolvedSchema.of(TABLE_CAT_COLUMN),
+ StringDataConverter.CONVERTER);
+ }
+
+ /**
+ * Create result set for schemas. The schema columns are:
+ *
+ * <ul>
+ * <li>TABLE_SCHEM String => schema name
+ * <li>TABLE_CATALOG String => catalog name (may be null)
+ * </ul>
+ *
+ * <p>The results are ordered by TABLE_CATALOG and TABLE_SCHEM.
+ *
+ * @param statement The statement for database meta data
+ * @param catalogs The catalog list
+ * @param catalogSchemas The catalog with schema list
+ * @return a ResultSet object in which each row is a schema description
+ */
+ public static FlinkResultSet createSchemasResultSet(
+ Statement statement, List<String> catalogs, Map<String,
List<String>> catalogSchemas) {
+ List<RowData> schemaWithCatalogList = new ArrayList<>();
+ List<String> catalogList = new ArrayList<>(catalogs);
+ catalogList.sort(String::compareTo);
+ for (String catalog : catalogList) {
+ List<String> schemas = catalogSchemas.get(catalog);
+ schemas.sort(String::compareTo);
+ schemas.forEach(
+ s ->
+ schemaWithCatalogList.add(
+ GenericRowData.of(
+ StringData.fromString(s),
+ StringData.fromString(catalog))));
+ }
+
+ return new FlinkResultSet(
+ statement,
+ new CollectionResultIterator(schemaWithCatalogList.iterator()),
+ ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN),
+ StringDataConverter.CONVERTER);
+ }
+}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StatementResultIterator.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StatementResultIterator.java
new file mode 100644
index 00000000000..bfc41f9ec38
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StatementResultIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.RowData;
+
+/** Closeable result iterator for statement result. */
+public class StatementResultIterator implements
CloseableResultIterator<RowData> {
+ private final StatementResult result;
+
+ public StatementResultIterator(StatementResult result) {
+ this.result = result;
+ }
+
+ @Override
+ public void close() throws Exception {
+ result.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return result.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ return result.next();
+ }
+}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
index 9311175d17d..08a6faa873d 100644
---
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
+++
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java
@@ -73,9 +73,8 @@ public class FlinkConnectionTest extends
FlinkJdbcDriverTestBase {
public void testClientInfo() throws Exception {
Properties properties = new Properties();
properties.setProperty("key3", "val3");
- try (FlinkConnection connection =
- new FlinkConnection(
- getDriverUri("jdbc:flink://%s:%s?key1=val1&key2=val2",
properties))) {
+ DriverUri driverUri =
getDriverUri("jdbc:flink://%s:%s?key1=val1&key2=val2", properties);
+ try (FlinkConnection connection = new FlinkConnection(driverUri)) {
assertEquals("val1", connection.getClientInfo("key1"));
assertEquals("val2", connection.getClientInfo("key2"));
assertEquals("val3", connection.getClientInfo("key3"));
diff --git
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java
new file mode 100644
index 00000000000..1d469108814
--- /dev/null
+++
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaDataTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for flink database metadata. */
+public class FlinkDatabaseMetaDataTest extends FlinkJdbcDriverTestBase {
+ @Test
+ public void testCatalogSchemas() throws Exception {
+
+ DriverUri driverUri = getDriverUri();
+ try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+ Executor executor = connection.getExecutor();
+ // create databases in default catalog
+ executeDDL("CREATE DATABASE database11", executor);
+ executeDDL("CREATE DATABASE database12", executor);
+ executeDDL("CREATE DATABASE database13", executor);
+
+ // create catalog2 and databases
+ executeDDL("CREATE CATALOG test_catalog2 WITH
('type'='generic_in_memory');", executor);
+ executeDDL("CREATE DATABASE test_catalog2.database11", executor);
+ executeDDL("CREATE DATABASE test_catalog2.database21", executor);
+ executeDDL("CREATE DATABASE test_catalog2.database31", executor);
+
+ // create catalog1 and databases
+ executeDDL("CREATE CATALOG test_catalog1 WITH
('type'='generic_in_memory');", executor);
+ executeDDL("CREATE DATABASE test_catalog1.database11", executor);
+ executeDDL("CREATE DATABASE test_catalog1.database21", executor);
+ executeDDL("CREATE DATABASE test_catalog1.database13", executor);
+
+ connection.setCatalog("test_catalog2");
+ connection.setSchema("database21");
+ assertEquals("test_catalog2", connection.getCatalog());
+ assertEquals("database21", connection.getSchema());
+
+ DatabaseMetaData databaseMetaData =
+ new FlinkDatabaseMetaData(
+ driverUri.getURL(), connection, new
TestingStatement());
+ // Show all catalogs
+ assertThat(resultSetToListAndClose(databaseMetaData.getCatalogs()))
+ .containsExactly("default_catalog", "test_catalog1",
"test_catalog2");
+ // Show all databases
+ assertThat(resultSetToListAndClose(databaseMetaData.getSchemas()))
+ .containsExactly(
+ "database11,default_catalog",
+ "database12,default_catalog",
+ "database13,default_catalog",
+ "default_database,default_catalog",
+ "database11,test_catalog1",
+ "database13,test_catalog1",
+ "database21,test_catalog1",
+ "default,test_catalog1",
+ "database11,test_catalog2",
+ "database21,test_catalog2",
+ "database31,test_catalog2",
+ "default,test_catalog2");
+
+ // Validate that the default catalog and database are not changed.
+ assertEquals("test_catalog2", connection.getCatalog());
+ assertEquals("database21", connection.getSchema());
+ }
+ }
+
+ private List<String> resultSetToListAndClose(ResultSet resultSet) throws
Exception {
+ List<String> resultList = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ List<String> columnStringList = new ArrayList<>(columnCount);
+ for (int i = 1; i <= columnCount; i++) {
+ columnStringList.add(resultSet.getString(i));
+ }
+ resultList.add(StringUtils.join(columnStringList, ","));
+ }
+ resultSet.close();
+
+ return resultList;
+ }
+
+ private void executeDDL(String sql, Executor executor) {
+ try (StatementResult result = executor.executeStatement(sql)) {
+ assertTrue(result.hasNext());
+ RowData rowData = result.next();
+ assertEquals(1, rowData.getArity());
+ assertEquals("OK", rowData.getString(0).toString());
+
+ assertFalse(result.hasNext());
+ }
+ }
+}