openinx commented on a change in pull request #2666:
URL: https://github.com/apache/iceberg/pull/2666#discussion_r660562573
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
##########
@@ -19,56 +19,155 @@
package org.apache.iceberg.flink;
+import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
public class FlinkDynamicTableFactory implements DynamicTableSinkFactory,
DynamicTableSourceFactory {
+ private static final String FACTORY_IDENTIFIER = "iceberg";
+
+ private static final ConfigOption<String> CATALOG_NAME =
+ ConfigOptions.key("catalog-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog name");
+
+ private static final ConfigOption<String> CATALOG_TYPE =
+ ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog type, the optional types are: custom,
hadoop, hive.");
+
+ private static final ConfigOption<String> CATALOG_DATABASE =
+ ConfigOptions.key("catalog-database")
+ .stringType()
+ .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+ .withDescription("Database name that managed in the iceberg
catalog.");
+
private final FlinkCatalog catalog;
+ public FlinkDynamicTableFactory() {
+ this.catalog = null;
+ }
+
public FlinkDynamicTableFactory(FlinkCatalog catalog) {
this.catalog = catalog;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
- ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
- TableLoader tableLoader = createTableLoader(objectPath);
+ ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+ Map<String, String> tableProps = context.getCatalogTable().getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
- return new IcebergTableSource(tableLoader, tableSchema,
context.getCatalogTable().getOptions(),
- context.getConfiguration());
+
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog,
objectIdentifier.toObjectPath());
+ } else {
+ tableLoader = createTableLoader(catalogTable, tableProps,
objectIdentifier.getObjectName());
Review comment:
> How do you create an in-memory catalog table pointing to an Iceberg
table if the Iceberg table already exists? What if the DDL, like the schema,
doesn't match?
If the underlying iceberg table already exists, the we still need to create
the in-memory catalog table pointing to it. If the in-memory catalog table
schema does not match the underlying iceberg table, then the create table
statement won't throw any exception but when executing the `SELECT` query or
`INSERT INTO` query it will throw exception if the RowData read from iceberg
table could not be parsed by the in-memory catalog table schema. People will
need to re-create the in-memory table and map to the underlying table once
again. That's the default behavior for flink users, because almost of the
flink connectors are the similar behavior ( such as
[JDBC](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/)
connector,
[hive](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/hive_read_write/)
connector,
[hbase](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hbase/)
con
nector).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]