stevenzwu commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r575482464



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -19,51 +19,57 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-public class FlinkTableFactory implements TableSinkFactory<RowData>, 
TableSourceFactory<RowData> {
+public class FlinkTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+  public static final String IDENTIFIER = "iceberg";
   private final FlinkCatalog catalog;
 
   public FlinkTableFactory(FlinkCatalog catalog) {
     this.catalog = catalog;
   }
 
   @Override
-  public TableSource<RowData> createTableSource(TableSourceFactory.Context 
context) {
+  public DynamicTableSource 
createDynamicTableSource(DynamicTableFactory.Context context) {
     ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
     TableLoader tableLoader = createTableLoader(objectPath);
-    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    return new IcebergTableSource(tableLoader, tableSchema, 
context.getTable().getOptions(),
+    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    return new IcebergTableSource(tableLoader, tableSchema, 
context.getCatalogTable().getOptions(),
         context.getConfiguration());
   }
 
   @Override
-  public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
+  public DynamicTableSink createDynamicTableSink(Context context) {
     ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
     TableLoader tableLoader = createTableLoader(objectPath);
-    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    return new IcebergTableSink(context.isBounded(), tableLoader, tableSchema);
+    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    return new IcebergTableSink(tableLoader, tableSchema);
   }
 
   @Override
-  public Map<String, String> requiredContext() {
+  public Set<ConfigOption<?>> requiredOptions() {

Review comment:
       `TableFactory` link is for the old interface that we are moving away 
from here. So it is not relevant here.
   
   For the new `Factory` interface, here is the javadoc. there are some impls 
(like `ParquetFileFormatFactory`) return an empty set
   ```
    * <p>Every factory declares a set of required and optional options. This 
information will not be
    * used during discovery but is helpful when generating documentation and 
performing validation. A
    * factory may discover further (nested) factories, the options of the 
nested factories must not be
    * declared in the sets of this factory.
   ```
   
   You linked `HiveDynamicTableFactory` that throws exception for those options 
method. Error msg suggests that the catalog code path won't call these methods. 
and there should be no non-catalog usage for Hive. I am fine with keeping the 
same behavior for Iceberg.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to