stevenzwu commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r573310539



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -19,51 +19,57 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-public class FlinkTableFactory implements TableSinkFactory<RowData>, 
TableSourceFactory<RowData> {
+public class FlinkTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+  public static final String IDENTIFIER = "iceberg";
   private final FlinkCatalog catalog;
 
   public FlinkTableFactory(FlinkCatalog catalog) {
     this.catalog = catalog;
   }
 
   @Override
-  public TableSource<RowData> createTableSource(TableSourceFactory.Context 
context) {
+  public DynamicTableSource 
createDynamicTableSource(DynamicTableFactory.Context context) {
     ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
     TableLoader tableLoader = createTableLoader(objectPath);
-    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    return new IcebergTableSource(tableLoader, tableSchema, 
context.getTable().getOptions(),
+    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    return new IcebergTableSource(tableLoader, tableSchema, 
context.getCatalogTable().getOptions(),
         context.getConfiguration());
   }
 
   @Override
-  public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
+  public DynamicTableSink createDynamicTableSink(Context context) {
     ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
     TableLoader tableLoader = createTableLoader(objectPath);
-    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    return new IcebergTableSink(context.isBounded(), tableLoader, tableSchema);
+    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    return new IcebergTableSink(tableLoader, tableSchema);
   }
 
   @Override
-  public Map<String, String> requiredContext() {
+  public Set<ConfigOption<?>> requiredOptions() {

Review comment:
       why throw exception here? if there are no required or optional configs, 
we should just return empty sets, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to