ms1111 commented on issue #10180:
URL: https://github.com/apache/iceberg/issues/10180#issuecomment-3615053652

   @ysn2233 We were using Flink 1.19 or 1.20 at the time, and Iceberg 1.9.x. 
This may help:
   
   ```java
   /**
    * Overrides Iceberg's FlinkCatalogFactory to avoid a dependency on 
HdfsConfiguration,
    * a subclass of Configuration.
    *
    * For Flink to discover this class, an entry is needed in
    * META-INF/services/org.apache.flink.table.factories.Factory.
    */
   public class NoHDFSFlinkCatalogFactory extends FlinkCatalogFactory {
       // This is an old, deprecated entrypoint, but it's the one available in 
Iceberg's FlinkCatalogFactory at the moment
       @Override
       @Deprecated
       public Catalog createCatalog(String name, Map<String, String> 
properties) {
           return createCatalog(name, properties, new 
org.apache.hadoop.conf.Configuration());
       }
   
       // This is the new entrypoint
       @Override
       public Catalog createCatalog(Context context) {
           return createCatalog(context.getName(), context.getOptions());
       }
   
       @Override
       public String factoryIdentifier() {
           return "iceberg-nohdfs";
       }
   }
   ```
   
   and src/main/resources/META-INF/org.apache.flink.table.factories.Factory:
   ```
   # This is needed so StreamTableEnvironment.createCatalog() can find our 
catalog factory
   your.package.NoHDFSFlinkCatalogFactory
   ```
   
   To use it:
   ```java
   Catalog catalogFactory = new 
NoHDFSFlinkCatalogFactory().createCatalog(catalogSettings.getCatalogName(), 
options);
   
   Map<String, String> options = catalogFactory.requiredContext();
   options.put("catalog-impl", "org.apache.iceberg.rest.RESTCatalog");
   options.put(...);
   Configuration catalogConfig = Configuration.fromMap(options);
   
   // We already have the catalog factory, but Flink wants to find it by type.
   // There was a streamTableEnvironment.registerCatalog() method but it was 
deprecated.
   // Have to do it indirectly now.
   catalogConfig.set(CommonCatalogOptions.CATALOG_TYPE, 
catalogFactory.factoryIdentifier());
   
   CatalogDescriptor catalogDescriptor = CatalogDescriptor.of("<catalog name>", 
catalogConfig);
   streamTableEnvironment.createCatalog("<catalog name>", catalogDescriptor);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to