Copilot commented on code in PR #1799: URL: https://github.com/apache/fluss/pull/1799#discussion_r2427698073
########## fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.flink; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.catalog.FlinkCatalog; +import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; +import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link FlinkCatalog}. */ +class FlinkCatalogLakeTest extends FlinkIcebergTieringTestBase { + + protected static final String DEFAULT_DB = "fluss"; + + protected static final String CATALOG_NAME = "test_iceberg_lake"; + + FlinkCatalog catalog; + + @BeforeEach + public void beforeEach() { + super.beforeEach(); + buildCatalog(); + } + + @Test + // TODO: remove this test in #1803 Review Comment: The TODO comment references issue #1803 but provides no context about why this test should be removed or what #1803 addresses. Consider adding more context or a brief explanation. ```suggestion // TODO: remove this test in #1803. Issue #1803 tracks the refactoring of lake table handling; this test will become obsolete once lake table logic is moved to the new implementation. ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java: ########## @@ -17,36 +17,128 @@ package org.apache.fluss.flink.lake; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.utils.DataLakeUtils; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.utils.MapUtils; + +import org.apache.flink.table.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.FlinkFileIOLoader; import org.apache.paimon.options.Options; +import java.lang.reflect.Method; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; +import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; /** A lake catalog to delegate the operations on lake table. */ public class LakeCatalog { + private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = + MapUtils.newConcurrentHashMap(); + + private static final ReentrantLock LOCK = new ReentrantLock(); + + private final String catalogName; + private final ClassLoader classLoader; + + public LakeCatalog(String catalogName, ClassLoader classLoader) { + this.catalogName = catalogName; + this.classLoader = classLoader; + } + + public Catalog getLakeCatalog(Configuration tableOptions) { + return inLock( + LOCK, + () -> { + DataLakeFormat lakeFormat = + tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT); + if (lakeFormat == PAIMON) { + // TODO: Currently, a Fluss cluster only supports a single DataLake storage. + // However, in the + // future, it may support multiple DataLakes. The following code assumes + // that a single + // lakeCatalog is shared across multiple tables, which will no longer be + // valid in such + // cases and should be updated accordingly. + LAKE_CATALOG_CACHE.computeIfAbsent( + PAIMON, + k -> + PaimonCatalogFactory.create( + catalogName, tableOptions, classLoader)); - // currently, only support paimon - // todo make it pluggable - private final FlinkCatalog paimonFlinkCatalog; - - public LakeCatalog( - String catalogName, Map<String, String> catalogProperties, ClassLoader classLoader) { - CatalogContext catalogContext = - CatalogContext.create( - Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()); - paimonFlinkCatalog = - FlinkCatalogFactory.createCatalog(catalogName, catalogContext, classLoader); + } else if (lakeFormat == ICEBERG) { + LAKE_CATALOG_CACHE.computeIfAbsent( + ICEBERG, + k -> IcebergCatalogFactory.create(catalogName, tableOptions)); + } else { + throw new UnsupportedOperationException( + "Unsupported datalake format: " + lakeFormat); + } + return LAKE_CATALOG_CACHE.get(lakeFormat); + }); } - public CatalogBaseTable getTable(ObjectPath objectPath) - throws TableNotExistException, CatalogException { - return paimonFlinkCatalog.getTable(objectPath); + /** + * Factory for creating Paimon Catalog instances. + * + * <p>Purpose: Encapsulates Paimon-related dependencies (e.g. FlinkFileIOLoader) to avoid direct + * dependency in the main LakeCatalog class. + */ + public static class PaimonCatalogFactory { + + private PaimonCatalogFactory() {} + + public static Catalog create( + String catalogName, Configuration tableOptions, ClassLoader classLoader) { + Map<String, String> catalogProperties = + DataLakeUtils.extractLakeCatalogProperties(tableOptions); + return FlinkCatalogFactory.createCatalog( + catalogName, + CatalogContext.create( + Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()), + classLoader); + } + } + + /** Factory use reflection to create Iceberg Catalog instances. */ + public static class IcebergCatalogFactory { + + private IcebergCatalogFactory() {} + + // Iceberg 1.4.3 is the last Java 8 compatible version, while Flink Iceberg 1.18+ requires + // 1.5.0+. + // Using reflection to maintain Java 8 compatibility. + public static Catalog create(String catalogName, Configuration tableOptions) { + Map<String, String> catalogProperties = + DataLakeUtils.extractLakeCatalogProperties(tableOptions); + // Map "type" to "catalog-type" (equivalent) + // Required: either "catalog-type" (standard type) or "catalog-impl" + // (fully-qualified custom class, mandatory if "catalog-type" is missing) + if (catalogProperties.containsKey("type")) { + catalogProperties.put("catalog-type", catalogProperties.get("type")); + } + try { + Class<?> flinkCatalogFactoryClass = + Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory"); + Object factoryInstance = + flinkCatalogFactoryClass.getDeclaredConstructor().newInstance(); + + Method createCatalogMethod = + flinkCatalogFactoryClass.getMethod( + "createCatalog", String.class, Map.class); + return (Catalog) + createCatalogMethod.invoke(factoryInstance, catalogName, catalogProperties); + } catch (Exception e) { + throw new RuntimeException( + "Failed to create Iceberg catalog using reflection. Please make sure iceberg-flink-runtime is on the classpath.", Review Comment: The error message mentions 'iceberg-flink-runtime' but should be more specific about the exact artifact name. Based on the pom.xml, it should reference 'iceberg-flink-${flink.major.version}' or provide the full artifact coordinates. ```suggestion "Failed to create Iceberg catalog using reflection. Please make sure the correct Iceberg Flink connector is on the classpath (e.g., 'org.apache.iceberg:iceberg-flink-${flink.major.version}:<version>'). Refer to your Flink version and the Iceberg documentation for the exact artifact coordinates.", ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java: ########## @@ -17,36 +17,128 @@ package org.apache.fluss.flink.lake; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.utils.DataLakeUtils; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.utils.MapUtils; + +import org.apache.flink.table.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.FlinkFileIOLoader; import org.apache.paimon.options.Options; +import java.lang.reflect.Method; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; +import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; /** A lake catalog to delegate the operations on lake table. */ public class LakeCatalog { + private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = + MapUtils.newConcurrentHashMap(); + + private static final ReentrantLock LOCK = new ReentrantLock(); + + private final String catalogName; + private final ClassLoader classLoader; + + public LakeCatalog(String catalogName, ClassLoader classLoader) { + this.catalogName = catalogName; + this.classLoader = classLoader; + } + + public Catalog getLakeCatalog(Configuration tableOptions) { + return inLock( + LOCK, + () -> { + DataLakeFormat lakeFormat = + tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT); + if (lakeFormat == PAIMON) { + // TODO: Currently, a Fluss cluster only supports a single DataLake storage. + // However, in the + // future, it may support multiple DataLakes. The following code assumes + // that a single + // lakeCatalog is shared across multiple tables, which will no longer be + // valid in such + // cases and should be updated accordingly. + LAKE_CATALOG_CACHE.computeIfAbsent( + PAIMON, + k -> + PaimonCatalogFactory.create( + catalogName, tableOptions, classLoader)); - // currently, only support paimon - // todo make it pluggable - private final FlinkCatalog paimonFlinkCatalog; - - public LakeCatalog( - String catalogName, Map<String, String> catalogProperties, ClassLoader classLoader) { - CatalogContext catalogContext = - CatalogContext.create( - Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()); - paimonFlinkCatalog = - FlinkCatalogFactory.createCatalog(catalogName, catalogContext, classLoader); + } else if (lakeFormat == ICEBERG) { + LAKE_CATALOG_CACHE.computeIfAbsent( + ICEBERG, + k -> IcebergCatalogFactory.create(catalogName, tableOptions)); + } else { + throw new UnsupportedOperationException( + "Unsupported datalake format: " + lakeFormat); + } + return LAKE_CATALOG_CACHE.get(lakeFormat); + }); } - public CatalogBaseTable getTable(ObjectPath objectPath) - throws TableNotExistException, CatalogException { - return paimonFlinkCatalog.getTable(objectPath); + /** + * Factory for creating Paimon Catalog instances. + * + * <p>Purpose: Encapsulates Paimon-related dependencies (e.g. FlinkFileIOLoader) to avoid direct + * dependency in the main LakeCatalog class. + */ + public static class PaimonCatalogFactory { + + private PaimonCatalogFactory() {} + + public static Catalog create( + String catalogName, Configuration tableOptions, ClassLoader classLoader) { + Map<String, String> catalogProperties = + DataLakeUtils.extractLakeCatalogProperties(tableOptions); + return FlinkCatalogFactory.createCatalog( + catalogName, + CatalogContext.create( + Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()), + classLoader); + } + } + + /** Factory use reflection to create Iceberg Catalog instances. */ + public static class IcebergCatalogFactory { + + private IcebergCatalogFactory() {} + + // Iceberg 1.4.3 is the last Java 8 compatible version, while Flink Iceberg 1.18+ requires + // 1.5.0+. Review Comment: The comment mentions 'Flink Iceberg 1.18+' but it's unclear what this refers to. Consider clarifying whether this means Flink version 1.18+ or Iceberg connector version 1.18+. ```suggestion // Iceberg 1.4.3 is the last Java 8 compatible version, while the Iceberg Flink connector version 1.18+ requires // Iceberg 1.5.0+. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
