This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new aec141dc8 [flink] Optimize the getLakeTableFactory in LakeTableFactory
(#1816)
aec141dc8 is described below
commit aec141dc8414217c833758956a6488ade1dad160
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Oct 16 13:37:33 2025 +0800
[flink] Optimize the getLakeTableFactory in LakeTableFactory (#1816)
---
.../apache/fluss/flink/catalog/FlinkCatalog.java | 10 +--
.../fluss/flink/catalog/FlinkTableFactory.java | 10 +--
.../{LakeCatalog.java => LakeFlinkCatalog.java} | 80 ++++++++++++----------
.../apache/fluss/flink/lake/LakeTableFactory.java | 62 ++++++-----------
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 9 +--
5 files changed, 75 insertions(+), 96 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 3ecb01541..7e10c4a5c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -23,7 +23,7 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
-import org.apache.fluss.flink.lake.LakeCatalog;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.procedure.ProcedureManager;
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
import org.apache.fluss.flink.utils.FlinkConversions;
@@ -114,7 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
protected final String defaultDatabase;
protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
- private final LakeCatalog lakeCatalog;
+ private final LakeFlinkCatalog lakeFlinkCatalog;
protected Connection connection;
protected Admin admin;
@@ -130,12 +130,12 @@ public class FlinkCatalog extends AbstractCatalog {
this.bootstrapServers = bootstrapServers;
this.classLoader = classLoader;
this.securityConfigs = securityConfigs;
- this.lakeCatalog = new LakeCatalog(catalogName, classLoader);
+ this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
}
@Override
public Optional<Factory> getFactory() {
- return Optional.of(new FlinkTableFactory(lakeCatalog));
+ return Optional.of(new FlinkTableFactory(lakeFlinkCatalog));
}
@Override
@@ -340,7 +340,7 @@ public class FlinkCatalog extends AbstractCatalog {
// Need to reconstruct: table_name + $snapshots
tableName = String.join("", tableComponents);
}
- return lakeCatalog
+ return lakeFlinkCatalog
.getLakeCatalog(properties)
.getTable(new ObjectPath(databaseName, tableName));
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 0b0a59981..57ead2c9e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -20,7 +20,7 @@ package org.apache.fluss.flink.catalog;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
-import org.apache.fluss.flink.lake.LakeCatalog;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.lake.LakeTableFactory;
import org.apache.fluss.flink.sink.FlinkTableSink;
import org.apache.fluss.flink.source.FlinkTableSource;
@@ -69,11 +69,11 @@ import static
org.apache.fluss.flink.utils.FlinkConversions.toFlinkOption;
/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
- private final LakeCatalog lakeCatalog;
+ private final LakeFlinkCatalog lakeFlinkCatalog;
private volatile LakeTableFactory lakeTableFactory;
- public FlinkTableFactory(LakeCatalog lakeCatalog) {
- this.lakeCatalog = lakeCatalog;
+ public FlinkTableFactory(LakeFlinkCatalog lakeFlinkCatalog) {
+ this.lakeFlinkCatalog = lakeFlinkCatalog;
}
@Override
@@ -257,7 +257,7 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
if (lakeTableFactory == null) {
synchronized (this) {
if (lakeTableFactory == null) {
- lakeTableFactory = new LakeTableFactory(lakeCatalog);
+ lakeTableFactory = new LakeTableFactory(lakeFlinkCatalog);
}
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
similarity index 68%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
rename to
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
index e1f8096e6..3eb0db5d9 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
@@ -21,7 +21,6 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.utils.DataLakeUtils;
import org.apache.fluss.metadata.DataLakeFormat;
-import org.apache.fluss.utils.MapUtils;
import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
@@ -34,59 +33,64 @@ import java.util.Map;
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
-/** A lake catalog to delegate the operations on lake table. */
-public class LakeCatalog {
- private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
- MapUtils.newConcurrentHashMap();
+/** A lake flink catalog to delegate the operations on lake table. */
+public class LakeFlinkCatalog {
private final String catalogName;
private final ClassLoader classLoader;
- public LakeCatalog(String catalogName, ClassLoader classLoader) {
+ private volatile Catalog catalog;
+ private volatile DataLakeFormat lakeFormat;
+
+ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
this.catalogName = catalogName;
this.classLoader = classLoader;
}
public Catalog getLakeCatalog(Configuration tableOptions) {
- DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
- if (lakeFormat == null) {
- throw new IllegalArgumentException(
- "DataLake format is not specified in table options. "
- + "Please ensure '"
- + ConfigOptions.TABLE_DATALAKE_FORMAT.key()
- + "' is set.");
- }
- return LAKE_CATALOG_CACHE.computeIfAbsent(
- lakeFormat,
- (dataLakeFormat) -> {
- if (dataLakeFormat == PAIMON) {
- return PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
- } else if (dataLakeFormat == ICEBERG) {
- return IcebergCatalogFactory.create(catalogName,
tableOptions);
+ // TODO: Currently, a Fluss cluster only supports a single DataLake
storage.
+ // However, in the
+ // future, it may support multiple DataLakes. The following code
assumes
+ // that a single
+ // lakeCatalog is shared across multiple tables, which will no longer
be
+ // valid in such
+ // cases and should be updated accordingly.
+ if (catalog == null) {
+ synchronized (this) {
+ if (catalog == null) {
+ DataLakeFormat lakeFormat =
+
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+ if (lakeFormat == null) {
+ throw new IllegalArgumentException(
+ "DataLake format is not specified in table
options. "
+ + "Please ensure '"
+ +
ConfigOptions.TABLE_DATALAKE_FORMAT.key()
+ + "' is set.");
+ }
+ if (lakeFormat == PAIMON) {
+ catalog =
+ PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
+ this.lakeFormat = PAIMON;
+ } else if (lakeFormat == ICEBERG) {
+ catalog = IcebergCatalogFactory.create(catalogName,
tableOptions);
+ this.lakeFormat = ICEBERG;
} else {
throw new UnsupportedOperationException(
- "Unsupported datalake format: " +
dataLakeFormat);
+ "Unsupported data lake format: " + lakeFormat);
}
- });
+ }
+ }
+ }
+ return catalog;
}
- public Catalog getLakeCatalog(Configuration tableOptions, DataLakeFormat
lakeFormat) {
- if (lakeFormat == null) {
- throw new IllegalArgumentException("DataLake format cannot be
null");
- }
- return LAKE_CATALOG_CACHE.computeIfAbsent(
+ public DataLakeFormat getLakeFormat() {
+ checkNotNull(
lakeFormat,
- (dataLakeFormat) -> {
- if (dataLakeFormat == PAIMON) {
- return PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
- } else if (dataLakeFormat == ICEBERG) {
- return IcebergCatalogFactory.create(catalogName,
tableOptions);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported datalake format: " +
dataLakeFormat);
- }
- });
+ "DataLake format is null, must call getLakeCatalog first to
initialize lake format.");
+ return lakeFormat;
}
/**
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
index 93120e9e2..3f0ff88c6 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
@@ -17,20 +17,20 @@
package org.apache.fluss.flink.lake;
+import org.apache.fluss.config.Configuration;
+
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import java.util.Map;
-
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
- private final LakeCatalog lakeCatalog;
+ private final LakeFlinkCatalog lakeFlinkCatalog;
- public LakeTableFactory(LakeCatalog lakeCatalog) {
- this.lakeCatalog = lakeCatalog;
+ public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) {
+ this.lakeFlinkCatalog = lakeFlinkCatalog;
}
public DynamicTableSource createDynamicTableSource(
@@ -42,21 +42,6 @@ public class LakeTableFactory {
originIdentifier.getDatabaseName(),
tableName);
- // Determine the lake format from the table options
- Map<String, String> tableOptions =
context.getCatalogTable().getOptions();
-
- // If not present, fallback to 'fluss.table.datalake.format' (set by
Fluss)
- String connector = tableOptions.get("connector");
- if (connector == null) {
- connector = tableOptions.get("fluss.table.datalake.format");
- }
-
- if (connector == null) {
- // For Paimon system tables (like table_name$options), the table
options are empty
- // Default to Paimon for backward compatibility
- connector = "paimon";
- }
-
// For Iceberg and Paimon, pass the table name as-is to their factory.
// Metadata tables will be handled internally by their respective
factories.
DynamicTableFactory.Context newContext =
@@ -69,21 +54,21 @@ public class LakeTableFactory {
context.isTemporary());
// Get the appropriate factory based on connector type
- DynamicTableSourceFactory factory = getLakeTableFactory(connector,
tableOptions);
+ DynamicTableSourceFactory factory = getLakeTableFactory();
return factory.createDynamicTableSource(newContext);
}
- private DynamicTableSourceFactory getLakeTableFactory(
- String connector, Map<String, String> tableOptions) {
- if ("paimon".equalsIgnoreCase(connector)) {
- return getPaimonFactory();
- } else if ("iceberg".equalsIgnoreCase(connector)) {
- return getIcebergFactory(tableOptions);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported lake connector: "
- + connector
- + ". Only 'paimon' and 'iceberg' are supported.");
+ private DynamicTableSourceFactory getLakeTableFactory() {
+ switch (lakeFlinkCatalog.getLakeFormat()) {
+ case PAIMON:
+ return getPaimonFactory();
+ case ICEBERG:
+ return getIcebergFactory();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported lake connector: "
+ + lakeFlinkCatalog.getLakeFormat()
+ + ". Only 'paimon' and 'iceberg' are
supported.");
}
}
@@ -91,22 +76,19 @@ public class LakeTableFactory {
return new org.apache.paimon.flink.FlinkTableFactory();
}
- private DynamicTableSourceFactory getIcebergFactory(Map<String, String>
tableOptions) {
+ private DynamicTableSourceFactory getIcebergFactory() {
try {
- // Get the Iceberg FlinkCatalog instance from LakeCatalog
- org.apache.fluss.config.Configuration flussConfig =
-
org.apache.fluss.config.Configuration.fromMap(tableOptions);
-
// Get catalog with explicit ICEBERG format
org.apache.flink.table.catalog.Catalog catalog =
- lakeCatalog.getLakeCatalog(
- flussConfig,
org.apache.fluss.metadata.DataLakeFormat.ICEBERG);
+ lakeFlinkCatalog.getLakeCatalog(
+ // we can pass empty configuration to get catalog
+ // since the catalog should already be initialized
+ new Configuration());
// Create FlinkDynamicTableFactory with the catalog
Class<?> icebergFactoryClass =
Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory");
Class<?> flinkCatalogClass =
Class.forName("org.apache.iceberg.flink.FlinkCatalog");
-
return (DynamicTableSourceFactory)
icebergFactoryClass
.getDeclaredConstructor(flinkCatalogClass)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 0ff38190c..0518290a5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -278,7 +278,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
@ParameterizedTest
@ValueSource(booleans = {false, true})
- void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws
Exception {
+ void testReadIcebergLakeTable(boolean isPartitioned) throws Exception {
// first of all, start tiering
JobClient jobClient = buildTieringJob(execEnv);
@@ -308,13 +308,6 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
int expectedUserRowCount = isPartitioned ? 2 *
waitUntilPartitions(t1).size() : 2;
assertThat(icebergRows).hasSize(expectedUserRowCount);
- // verify rows have expected number of columns
- int userColumnCount =
lakeTableResult.getResolvedSchema().getColumnCount();
- Row firstRow = icebergRows.get(0);
- assertThat(firstRow.getArity())
- .as("Iceberg row should have at least user columns")
- .isGreaterThanOrEqualTo(userColumnCount);
-
// Test 2: Read Iceberg system table (snapshots) using $lake$snapshots
suffix
TableResult snapshotsResult =
batchTEnv.executeSql(String.format("select * from
%s$lake$snapshots", tableName));