luoyuxia commented on code in PR #1812:
URL: https://github.com/apache/fluss/pull/1812#discussion_r2431744136
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -68,17 +69,29 @@
/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+ private final LakeCatalog lakeCatalog;
private volatile LakeTableFactory lakeTableFactory;
+ public FlinkTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
+ }
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// check whether should read from datalake
ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
String tableName = tableIdentifier.getObjectName();
if (tableName.contains(LAKE_TABLE_SPLITTER)) {
- tableName = tableName.substring(0,
tableName.indexOf(LAKE_TABLE_SPLITTER));
+ // Extract the lake table name: for "table$lake" -> "table"
+ // for "table$lake$snapshots" -> "table$snapshots"
+ String baseTableName = tableName.substring(0,
tableName.indexOf(LAKE_TABLE_SPLITTER));
Review Comment:
nit: can be replace with
```
String lakeTableName = tableName.replaceFirst("\\$lake", "");
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
+ private final LakeCatalog lakeCatalog;
- // now, always assume is paimon, todo need to describe lake storage from
- // to know which lake storage used
- private final org.apache.paimon.flink.FlinkTableFactory
paimonFlinkTableFactory;
-
- public LakeTableFactory() {
- paimonFlinkTableFactory = new FlinkTableFactory();
+ public LakeTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
}
public DynamicTableSource createDynamicTableSource(
DynamicTableFactory.Context context, String tableName) {
ObjectIdentifier originIdentifier = context.getObjectIdentifier();
- ObjectIdentifier paimonIdentifier =
+ ObjectIdentifier lakeIdentifier =
ObjectIdentifier.of(
originIdentifier.getCatalogName(),
originIdentifier.getDatabaseName(),
tableName);
+
+ // Determine the lake format from the table options
+ Map<String, String> tableOptions =
context.getCatalogTable().getOptions();
+
+ // If not present, fallback to 'fluss.table.datalake.format' (set by
Fluss)
Review Comment:
This peice of logic is hacky. If iceberg table don't put `connector` =
`iceberg` into table option. The peice of logic will fail. I create #1814 to
track it. Currently, let's keep it in this pr.
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned)
throws Exception {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws
Exception {
Review Comment:
is it possible to put the test case to `testUnionReadInStreamMode` to reduce
test code and time. Just like we did in
`org.apache.fluss.lake.paimon.flink.FlinkUnionReadPrimaryKeyTableITCase.testUnionReadFullType`
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
+ private final LakeCatalog lakeCatalog;
- // now, always assume is paimon, todo need to describe lake storage from
- // to know which lake storage used
- private final org.apache.paimon.flink.FlinkTableFactory
paimonFlinkTableFactory;
-
- public LakeTableFactory() {
- paimonFlinkTableFactory = new FlinkTableFactory();
+ public LakeTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
}
public DynamicTableSource createDynamicTableSource(
DynamicTableFactory.Context context, String tableName) {
ObjectIdentifier originIdentifier = context.getObjectIdentifier();
- ObjectIdentifier paimonIdentifier =
+ ObjectIdentifier lakeIdentifier =
ObjectIdentifier.of(
originIdentifier.getCatalogName(),
originIdentifier.getDatabaseName(),
tableName);
+
+ // Determine the lake format from the table options
+ Map<String, String> tableOptions =
context.getCatalogTable().getOptions();
+
+ // If not present, fallback to 'fluss.table.datalake.format' (set by
Fluss)
+ String connector = tableOptions.get("connector");
+ if (connector == null) {
+ connector = tableOptions.get("fluss.table.datalake.format");
+ }
+
+ if (connector == null) {
+ // For Paimon system tables (like table_name$options), the table
options are empty
+ // Default to Paimon for backward compatibility
+ connector = "paimon";
+ }
+
+ // For Iceberg and Paimon, pass the table name as-is to their factory.
+ // Metadata tables will be handled internally by their respective
factories.
DynamicTableFactory.Context newContext =
new FactoryUtil.DefaultDynamicTableContext(
- paimonIdentifier,
+ lakeIdentifier,
context.getCatalogTable(),
context.getEnrichmentOptions(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
- return paimonFlinkTableFactory.createDynamicTableSource(newContext);
+ // Get the appropriate factory based on connector type
+ DynamicTableSourceFactory factory = getLakeTableFactory(connector,
tableOptions);
+ return factory.createDynamicTableSource(newContext);
+ }
+
+ private DynamicTableSourceFactory getLakeTableFactory(
+ String connector, Map<String, String> tableOptions) {
+ if ("paimon".equalsIgnoreCase(connector)) {
+ return getPaimonFactory();
+ } else if ("iceberg".equalsIgnoreCase(connector)) {
+ return getIcebergFactory(tableOptions);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported lake connector: "
+ + connector
+ + ". Only 'paimon' and 'iceberg' are supported.");
+ }
+ }
+
+ private DynamicTableSourceFactory getPaimonFactory() {
+ try {
+ Class<?> paimonFactoryClass =
Review Comment:
for paimon, we don't need to use reflection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]