bowenli86 commented on a change in pull request #9971: [FLINK-14490][table] Add 
methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339230305
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##########
 @@ -207,28 +207,226 @@ public String getBuiltInDatabaseName() {
                return 
catalogs.get(getBuiltInCatalogName()).getDefaultDatabase();
        }
 
+       /**
+        * Result of a lookup for a table through {@link 
#getTable(ObjectIdentifier)}. It combines the
+        * {@link CatalogBaseTable} with additional information such as if the 
table is a temporary table or comes
+        * from the catalog.
+        */
+       public static class TableLookupResult {
+               private final boolean isTemporary;
+               private final CatalogBaseTable table;
+
+               private static TableLookupResult temporary(CatalogBaseTable 
table) {
+                       return new TableLookupResult(true, table);
+               }
+
+               private static TableLookupResult permanent(CatalogBaseTable 
table) {
+                       return new TableLookupResult(false, table);
+               }
+
+               private TableLookupResult(boolean isTemporary, CatalogBaseTable 
table) {
+                       this.isTemporary = isTemporary;
+                       this.table = table;
+               }
+
+               public boolean isTemporary() {
+                       return isTemporary;
+               }
+
+               public CatalogBaseTable getTable() {
+                       return table;
+               }
+       }
+
        /**
         * Retrieves a fully qualified table. If the path is not yet fully 
qualified use
         * {@link #qualifyIdentifier(UnresolvedIdentifier)} first.
         *
         * @param objectIdentifier full path of the table to retrieve
         * @return table that the path points to.
         */
-       public Optional<CatalogBaseTable> getTable(ObjectIdentifier 
objectIdentifier) {
+       public Optional<TableLookupResult> getTable(ObjectIdentifier 
objectIdentifier) {
                try {
-                       Catalog currentCatalog = 
catalogs.get(objectIdentifier.getCatalogName());
-                       ObjectPath objectPath = new ObjectPath(
-                               objectIdentifier.getDatabaseName(),
-                               objectIdentifier.getObjectName());
-
-                       if (currentCatalog != null && 
currentCatalog.tableExists(objectPath)) {
-                               return 
Optional.of(currentCatalog.getTable(objectPath));
+                       CatalogBaseTable temporaryTable = 
temporaryTables.get(objectIdentifier);
+                       if (temporaryTable != null) {
+                               return 
Optional.of(TableLookupResult.temporary(temporaryTable));
+                       } else {
+                               return getPermanentTable(objectIdentifier);
                        }
                } catch (TableNotExistException ignored) {
                }
                return Optional.empty();
        }
 
+       private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier 
objectIdentifier)
+                       throws TableNotExistException {
+               Catalog currentCatalog = 
catalogs.get(objectIdentifier.getCatalogName());
+               ObjectPath objectPath = new ObjectPath(
+                       objectIdentifier.getDatabaseName(),
+                       objectIdentifier.getObjectName());
+
+               if (currentCatalog != null && 
currentCatalog.tableExists(objectPath)) {
+                       return 
Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+               }
+               return Optional.empty();
+       }
+
+       /**
+        * Retrieves names of all registered catalogs.
+        *
+        * @return a set of names of registered catalogs
+        */
+       public Set<String> listCatalogs() {
+               return Collections.unmodifiableSet(catalogs.keySet());
+       }
+
+       /**
+        * Returns an array of names of all tables (tables and views, both 
temporary and permanent)
+        * registered in the namespace of the current catalog and database.
+        *
+        * @return names of all registered tables
+        */
+       public Set<String> listTables() {
+               return listTables(getCurrentCatalog(), getCurrentDatabase());
+       }
+
+       /**
+        * Returns an array of names of all tables (tables and views, both 
temporary and permanent)
+        * registered in the namespace of the current catalog and database.
+        *
+        * @return names of all registered tables
+        */
+       public Set<String> listTables(String catalogName, String databaseName) {
+               Catalog currentCatalog = catalogs.get(getCurrentCatalog());
+
+               try {
+                       return Stream.concat(
+                               
currentCatalog.listTables(getCurrentDatabase()).stream(),
+                               listTemporaryTablesInternal(catalogName, 
databaseName).map(e -> e.getKey().getObjectName())
+                       ).collect(Collectors.toSet());
+               } catch (DatabaseNotExistException e) {
+                       throw new ValidationException("Current database does 
not exist", e);
+               }
+       }
+
+       /**
+        * Returns an array of names of temporary tables registered in the 
namespace of the current
+        * catalog and database.
+        *
+        * @return names of registered temporary tables
+        */
+       public Set<String> listTemporaryTables() {
+               return listTemporaryTablesInternal(getCurrentCatalog(), 
getCurrentDatabase())
+                       .map(e -> e.getKey().getObjectName())
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Returns an array of names of temporary views registered in the 
namespace of the current
+        * catalog and database.
+        *
+        * @return names of registered temporary views
+        */
+       public Set<String> listTemporaryViews() {
+               return listTemporaryTablesInternal(getCurrentCatalog(), 
getCurrentDatabase())
+                       .filter(e -> e.getValue() instanceof CatalogView)
+                       .map(e -> e.getKey().getObjectName())
+                       .collect(Collectors.toSet());
+       }
+
+       private Stream<Map.Entry<ObjectIdentifier, CatalogBaseTable>> 
listTemporaryTablesInternal(
+                       String catalogName,
+                       String databaseName) {
+               return temporaryTables
+                       .entrySet()
+                       .stream()
+                       .filter(e -> {
+                               ObjectIdentifier identifier = e.getKey();
+                               return 
identifier.getCatalogName().equals(catalogName) &&
+                                       
identifier.getDatabaseName().equals(databaseName);
+                       });
+       }
+
+       /**
+        * Lists all available schemas in the root of the catalog manager. It 
is not equivalent to listing all catalogs
+        * as it includes also different catalog parts of the temporary objects.
+        *
+        * <b>NOTE:</b>It is primarily used for interacting with Calcite's 
schema.
 
 Review comment:
   mark the following calcite oriented methods `@Internal`?
   
   From Internal.java, "Interface to mark methods within stable, public APIs as 
an internal developer API."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to