kbendick commented on a change in pull request #3476:
URL: https://github.com/apache/iceberg/pull/3476#discussion_r743872545



##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
+ * <p>
+ * This supports the following catalog configuration options:
+ * <ul>
+ *   <li><code>type</code> - Flink catalog factory key, should be 
"iceberg"</li>
+ *   <li><code>catalog-type</code> - iceberg catalog type, "hive" or 
"hadoop"</li>
+ *   <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
+ *   <li><code>clients</code> - the Hive Client Pool Size (Hive catalog 
only)</li>
+ *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
+ *   <li><code>default-database</code> - a database name to use as the 
default</li>
+ *   <li><code>base-namespace</code> - a base namespace as the prefix for all 
databases (Hadoop catalog only)</li>
+ *   <li><code>cache-enabled</code> - whether to enable catalog cache</li>
+ * </ul>
+ * <p>
+ * To use a custom catalog that is not a Hive or Hadoop catalog, extend this 
class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ */
+public class FlinkCatalogFactory implements CatalogFactory {

Review comment:
       > This is trying to separate flink 1.13 code from flink 1.12, about the 
background, pls see: #3434 (comment)
   
   In addition to what's mentioned about the new sink interface, this also 
allows us to use the new `CatalogFactory` API.
   
   When we were upgrading from 1.12 to 1.13, one of the concerns was that 
`CatalogFactory` in 1.13 allows users to implement both the older deprecated 
interface, `TableFactory`, and the newer interface, `Factory`.
   
   If I remember correctly, the flink catalog loader in 1.13 will use the 
deprecated methods from `TableFactory` if they're implemented.
   
   If we separate, will we choose to implement 1.13's `CatalogFactory` using 
the `Factory` methods instead of continuing to implement the `TableFactory` 
interface methods? Perhaps that's what is meant in the comment about issues 
with the new sink interface etc?
   
   EDIT: It seems like the `Factory` methods _are_ implemented in 
`FlinkDynamicTableFactory`. I seem to recall though that it was this class that 
needed to also implement the new methods so that internal Flink code would 
instantiate via the new pathway.
   
   I can pull up the old PR with more details if we'd like.

##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkConfigOptions {
+
+  private FlinkConfigOptions() {
+  }
+
+  public static final ConfigOption<Boolean> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+      ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription("If is false, parallelism of source are set by 
config.\n" +
+              "If is true, source parallelism is inferred according to splits 
number.\n");
+
+  public static final ConfigOption<Integer> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
+      ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max")
+          .intType()
+          .defaultValue(100)
+          .withDescription("Sets max infer parallelism for source operator.");

Review comment:
       Nit: Consider adding `Ignored if 
$TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM is true`. Or if we throw when 
`TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM` is set to true, I'd mention that.
   
   But if this is pre-existing, there's no need to change in this PR to keep 
the changes minimal. 

##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkConfigOptions {
+
+  private FlinkConfigOptions() {
+  }
+
+  public static final ConfigOption<Boolean> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+      ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription("If is false, parallelism of source are set by 
config.\n" +
+              "If is true, source parallelism is inferred according to splits 
number.\n");

Review comment:
       Non-blocking Nit: Consider: `When false, the parallelism of Iceberg 
sources comes from the config. Otherwise, source parallelism is inferred based 
on the number of splits`.
   
   For me, just changing `If is false` to `When false` would be more concise. 
But up to you whether you want to change (or change in a later PR).

##########
File path: 
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+  static final String FACTORY_IDENTIFIER = "iceberg";
+
+  private static final ConfigOption<String> CATALOG_NAME =
+      ConfigOptions.key("catalog-name")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog name");
+
+  private static final ConfigOption<String> CATALOG_TYPE =
+      ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Catalog type, the optional types are: custom, 
hadoop, hive.");
+
+  private static final ConfigOption<String> CATALOG_DATABASE =
+      ConfigOptions.key("catalog-database")
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+          .withDescription("Database name managed in the iceberg catalog.");
+
+  private static final ConfigOption<String> CATALOG_TABLE =
+      ConfigOptions.key("catalog-table")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("Table name managed in the underlying iceberg 
catalog and database.");
+
+  // Flink 1.13.x change the return type from CatalogTable interface to 
ResolvedCatalogTable which extends the
+  // CatalogTable. Here we use the dynamic method loading approach to avoid 
adding explicit CatalogTable or
+  // ResolvedCatalogTable class into the iceberg-flink-runtime jar for 
compatibility purpose.
+  private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = 
DynMethods.builder("getCatalogTable")
+      .impl(Context.class, "getCatalogTable")
+      .orNoop()
+      .build();

Review comment:
       Will we remove this in a follow-up PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to