rdblue commented on a change in pull request #2666:
URL: https://github.com/apache/iceberg/pull/2666#discussion_r660201187



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
##########
@@ -19,56 +19,155 @@
 
 package org.apache.iceberg.flink;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
 public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+  private static final String FACTORY_IDENTIFIER = "iceberg";
+
+  private static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  private static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  private static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name that managed in the iceberg 
catalog.");
+
   private final FlinkCatalog catalog;
 
+  public FlinkDynamicTableFactory() {
+    this.catalog = null;
+  }
+
   public FlinkDynamicTableFactory(FlinkCatalog catalog) {
     this.catalog = catalog;
   }
 
   @Override
   public DynamicTableSource createDynamicTableSource(Context context) {
-    ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
-    TableLoader tableLoader = createTableLoader(objectPath);
+    ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+    Map<String, String> tableProps = context.getCatalogTable().getOptions();
+    CatalogTable catalogTable = context.getCatalogTable();
     TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-    return new IcebergTableSource(tableLoader, tableSchema, 
context.getCatalogTable().getOptions(),
-        context.getConfiguration());
+
+    TableLoader tableLoader;
+    if (catalog != null) {
+      tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+    } else {
+      tableLoader = createTableLoader(catalogTable, tableProps, 
objectIdentifier.getObjectName());

Review comment:
       Okay, I thought that this was a way to run DDL even if you don't have an 
Iceberg catalog defined. It sort of does that, but it also creates a reference 
in the in-memory catalog. That's fine, but it does bring up a couple other 
questions:
   * How do you create an in-memory catalog table pointing to an Iceberg table 
if the Iceberg table already exists? What if the DDL, like the schema, doesn't 
match?
   * Why share the table name between the in-memory and external catalog but 
not the database? I think it makes sense to default the external database and 
table name using the ones from the DDL command but allow both to be overridden.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to