openinx commented on a change in pull request #2666:
URL: https://github.com/apache/iceberg/pull/2666#discussion_r660562573



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
##########
@@ -19,56 +19,155 @@
 
 package org.apache.iceberg.flink;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
 public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+  private static final String FACTORY_IDENTIFIER = "iceberg";
+
+  private static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  private static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  private static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name that managed in the iceberg 
catalog.");
+
   private final FlinkCatalog catalog;
 
+  public FlinkDynamicTableFactory() {
+    this.catalog = null;
+  }
+
   public FlinkDynamicTableFactory(FlinkCatalog catalog) {
     this.catalog = catalog;
   }
 
   @Override
   public DynamicTableSource createDynamicTableSource(Context context) {
-    ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
-    TableLoader tableLoader = createTableLoader(objectPath);
+    ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+    Map<String, String> tableProps = context.getCatalogTable().getOptions();
+    CatalogTable catalogTable = context.getCatalogTable();
     TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-    return new IcebergTableSource(tableLoader, tableSchema, 
context.getCatalogTable().getOptions(),
-        context.getConfiguration());
+
+    TableLoader tableLoader;
+    if (catalog != null) {
+      tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+    } else {
+      tableLoader = createTableLoader(catalogTable, tableProps, 
objectIdentifier.getObjectName());

Review comment:
       > How do you create an in-memory catalog table pointing to an Iceberg 
table if the Iceberg table already exists? What if the DDL, like the schema, 
doesn't match?
   
   If the underlying iceberg table already exists, the we still need to create 
the in-memory catalog table pointing to it.  If the in-memory catalog table 
schema does not match the underlying iceberg table, then the create table 
statement won't throw any exception but when executing the `SELECT` query or 
`INSERT INTO` query it will throw exception if the RowData read from iceberg 
table could not be parsed by the in-memory catalog table schema.   People will 
need to re-create the in-memory table and map to the underlying table once 
again. That's the default behavior for flink users,  because almost of the 
flink connectors are the similar behavior ( such as 
[JDBC](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/)
 connector, 
[hive](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/hive_read_write/)
 connector, 
[hbase](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hbase/)
 con
 nector). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to