Copilot commented on code in PR #1797:
URL: https://github.com/apache/fluss/pull/1797#discussion_r2425611092
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -250,6 +250,49 @@ void testUnionReadLogTableFailover(boolean isPartitioned)
throws Exception {
jobClient.cancel().get();
}
+ // This test verifies that the $lake suffix works and data can be
retrieved directly from
+ // Iceberg
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadIcebergLakeTableDirectly(boolean isPartitioned) throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName =
+ "lake_direct_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ List<Row> writtenRows = new ArrayList<>();
+ long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned,
writtenRows);
+ // wait until records has been synced to Iceberg
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // Read Iceberg snapshot directly using $lake suffix
+ TableResult lakeTableResult =
+ batchTEnv.executeSql(String.format("select * from %s$lake",
tableName));
+ List<Row> icebergRows =
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+ // Verify that we can read data from Iceberg via $lake suffix
+ assertThat(icebergRows).isNotEmpty();
+ assertThat(icebergRows).hasSize(writtenRows.size());
+
+ // Note: Iceberg adds metadata columns (_spec_id, _partition, _file)
at the end.
+ // This test verifies that the $lake suffix works and data can be
retrieved,
Review Comment:
This comment is incomplete and ends abruptly with a comma. It should be
completed or the trailing comma should be removed.
```suggestion
// This test verifies that the $lake suffix works and data can be
retrieved.
```
##########
fluss-lake/fluss-lake-iceberg/pom.xml:
##########
@@ -31,6 +31,25 @@
<name>Fluss : Lake : Iceberg</name>
<packaging>jar</packaging>
+
+ <!-- Override Jackson version for Iceberg compatibility in tests -->
+ <properties>
+ <jackson.version.test>2.18.3</jackson.version.test>
+ </properties>
Review Comment:
The property `jackson.version.test` is defined but never used. The actual
Jackson version override is done in the dependencyManagement section below.
Either use this property in the dependencyManagement section or remove it.
```suggestion
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -17,40 +17,86 @@
package org.apache.fluss.flink.lake;
+import org.apache.fluss.metadata.DataLakeFormat;
+
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
- // now, always assume is paimon, todo need to describe lake storage from
- // to know which lake storage used
- private final org.apache.paimon.flink.FlinkTableFactory
paimonFlinkTableFactory;
+ private final Object delegateFactory;
+ private final DataLakeFormat lakeFormat;
+
+ public LakeTableFactory(DataLakeFormat lakeFormat, ClassLoader
classLoader) {
+ this.lakeFormat = lakeFormat;
+
+ if (lakeFormat == DataLakeFormat.PAIMON) {
+ this.delegateFactory = createPaimonFlinkTableFactory(classLoader);
+ } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+ this.delegateFactory = createIcebergFlinkTableFactory(classLoader);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported lake format: "
+ + lakeFormat
+ + ". Only PAIMON and ICEBERG are supported.");
+ }
+ }
+
+ private Object createPaimonFlinkTableFactory(ClassLoader classLoader) {
+ try {
+ Class<?> paimonFactoryClass =
+
classLoader.loadClass("org.apache.paimon.flink.FlinkTableFactory");
+ return paimonFactoryClass.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create Paimon FlinkTableFactory. Make sure
paimon-flink is on the classpath.",
+ e);
+ }
+ }
- public LakeTableFactory() {
- paimonFlinkTableFactory = new FlinkTableFactory();
+ private Object createIcebergFlinkTableFactory(ClassLoader classLoader) {
+ try {
+ Class<?> icebergFactoryClass =
+
classLoader.loadClass("org.apache.iceberg.flink.FlinkDynamicTableFactory");
+ return icebergFactoryClass.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create Iceberg FlinkDynamicTableFactory. Make
sure iceberg-flink is on the classpath.",
+ e);
+ }
}
public DynamicTableSource createDynamicTableSource(
DynamicTableFactory.Context context, String tableName) {
ObjectIdentifier originIdentifier = context.getObjectIdentifier();
- ObjectIdentifier paimonIdentifier =
+ ObjectIdentifier lakeIdentifier =
ObjectIdentifier.of(
originIdentifier.getCatalogName(),
originIdentifier.getDatabaseName(),
tableName);
DynamicTableFactory.Context newContext =
new FactoryUtil.DefaultDynamicTableContext(
- paimonIdentifier,
+ lakeIdentifier,
context.getCatalogTable(),
context.getEnrichmentOptions(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
- return paimonFlinkTableFactory.createDynamicTableSource(newContext);
+ try {
+ // Use reflection to call createDynamicTableSource on the delegate
factory
+ java.lang.reflect.Method createMethod =
+ delegateFactory
+ .getClass()
+ .getMethod(
+ "createDynamicTableSource",
DynamicTableFactory.Context.class);
+ return (DynamicTableSource) createMethod.invoke(delegateFactory,
newContext);
+ } catch (Exception e) {
Review Comment:
The reflection call should handle more specific exceptions. Consider
catching ReflectiveOperationException instead of the generic Exception to
better distinguish reflection-related errors from other potential issues.
```suggestion
} catch (ReflectiveOperationException e) {
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,124 @@
package org.apache.fluss.flink.lake;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.FlinkFileIOLoader;
-import org.apache.paimon.options.Options;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Map;
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
- // currently, only support paimon
- // todo make it pluggable
- private final FlinkCatalog paimonFlinkCatalog;
+ private final Catalog delegateCatalog;
+ private final DataLakeFormat lakeFormat;
public LakeCatalog(
+ String catalogName,
+ Map<String, String> catalogProperties,
+ ClassLoader classLoader,
+ DataLakeFormat lakeFormat) {
+ this.lakeFormat = lakeFormat;
+
+ if (lakeFormat == DataLakeFormat.PAIMON) {
+ this.delegateCatalog =
+ createPaimonFlinkCatalog(catalogName, catalogProperties,
classLoader);
+ } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+ this.delegateCatalog =
+ createIcebergFlinkCatalog(catalogName, catalogProperties,
classLoader);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported lake format: "
+ + lakeFormat
+ + ". Only PAIMON and ICEBERG are supported.");
+ }
+ }
+
+ private Catalog createPaimonFlinkCatalog(
String catalogName, Map<String, String> catalogProperties,
ClassLoader classLoader) {
- CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader());
- paimonFlinkCatalog =
- FlinkCatalogFactory.createCatalog(catalogName, catalogContext,
classLoader);
+ try {
+ // Use Paimon's Flink catalog factory via reflection to avoid hard
dependency
+ Class<?> catalogContextClass =
+
classLoader.loadClass("org.apache.paimon.catalog.CatalogContext");
+ Class<?> optionsClass =
classLoader.loadClass("org.apache.paimon.options.Options");
+ Class<?> fileIOLoaderClass =
+
classLoader.loadClass("org.apache.paimon.flink.FlinkFileIOLoader");
+ Class<?> flinkCatalogFactoryClass =
+
classLoader.loadClass("org.apache.paimon.flink.FlinkCatalogFactory");
+
+ // Create Options.fromMap(catalogProperties)
+ Method fromMapMethod = optionsClass.getMethod("fromMap",
Map.class);
+ Object options = fromMapMethod.invoke(null, catalogProperties);
+
+ // Create new FlinkFileIOLoader()
+ Object fileIOLoader =
fileIOLoaderClass.getDeclaredConstructor().newInstance();
+
+ // Create CatalogContext.create(options, null, fileIOLoader)
+ // CatalogContext.create signature: create(Options, Configuration,
FileIOLoader)
+ Class<?> hadoopConfigClass =
+
classLoader.loadClass("org.apache.hadoop.conf.Configuration");
+ Method createMethod =
+ catalogContextClass.getMethod(
+ "create", optionsClass, hadoopConfigClass,
fileIOLoaderClass);
+ Object catalogContext = createMethod.invoke(null, options, null,
fileIOLoader);
+
+ // Call FlinkCatalogFactory.createCatalog(catalogName,
catalogContext, classLoader)
+ Method createCatalogMethod =
+ flinkCatalogFactoryClass.getMethod(
+ "createCatalog", String.class,
catalogContextClass, ClassLoader.class);
+ return (Catalog)
+ createCatalogMethod.invoke(null, catalogName,
catalogContext, classLoader);
+ } catch (Exception e) {
+ throw new CatalogException(
+ "Failed to create Paimon Flink catalog. Make sure
paimon-flink is on the classpath.",
+ e);
+ }
}
public CatalogBaseTable getTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
- return paimonFlinkCatalog.getTable(objectPath);
+ return delegateCatalog.getTable(objectPath);
+ }
+
+ private Catalog createIcebergFlinkCatalog(
+ String catalogName, Map<String, String> catalogProperties,
ClassLoader classLoader) {
+ try {
+ // Use Iceberg's Flink catalog factory via reflection to avoid
hard dependency
+ Class<?> icebergCatalogFactoryClass =
+
classLoader.loadClass("org.apache.iceberg.flink.FlinkCatalogFactory");
+ Object factoryInstance =
+
icebergCatalogFactoryClass.getDeclaredConstructor().newInstance();
+ Method createCatalogMethod =
+ icebergCatalogFactoryClass.getMethod("createCatalog",
String.class, Map.class);
+
+ // Prepare Iceberg catalog properties
+ // Iceberg expects 'catalog-type' instead of 'type' for the
catalog implementation
+ Map<String, String> icebergProps = new
java.util.HashMap<>(catalogProperties);
Review Comment:
Use `HashMap` instead of the fully qualified `java.util.HashMap` for
consistency with the rest of the codebase. The import statement should be added
at the top of the file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]