luoyuxia commented on code in PR #1679:
URL: https://github.com/apache/fluss/pull/1679#discussion_r2340302653
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -567,7 +570,8 @@ public boolean applyAggregates(
|| aggregateExpressions.size() != 1
|| hasPrimaryKey()
|| groupingSets.size() > 1
- || (groupingSets.size() == 1 && groupingSets.get(0).length >
0)) {
+ || (groupingSets.size() == 1 && groupingSets.get(0).length > 0)
+ || isDataLakeEnabled) {
Review Comment:
add comment for why dataLakeEnabled is not supported for aggregate
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java:
##########
@@ -60,19 +60,16 @@ public static LakeSource<LakeSplit> createLakeSource(
try {
lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
} catch (UnsupportedOperationException e) {
- LOG.info(
- "No LakeStoragePlugin can be found for datalake format:
{}, return null to disable reading from lake source.",
- dataLake);
- return null;
+ throw new UnsupportedOperationException(
+ "No LakeStoragePlugin can be found for datalake format: "
+ dataLake);
Review Comment:
```suggestion
throw new UnsupportedOperationException(
String.format(
"No LakeStoragePlugin available for data lake
format: %s. "
+ "To resolve this, ensure
fluss-lake-%s.jar is in the classpath.",
dataLake, dataLake.toLowerCase()));
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java:
##########
@@ -60,19 +60,16 @@ public static LakeSource<LakeSplit> createLakeSource(
try {
lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
} catch (UnsupportedOperationException e) {
- LOG.info(
- "No LakeStoragePlugin can be found for datalake format:
{}, return null to disable reading from lake source.",
- dataLake);
- return null;
+ throw new UnsupportedOperationException(
+ "No LakeStoragePlugin can be found for datalake format: "
+ dataLake);
}
LakeStorage lakeStorage =
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
try {
return (LakeSource<LakeSplit>)
lakeStorage.createLakeSource(tablePath);
} catch (UnsupportedOperationException e) {
- LOG.info(
- "method createLakeSource throw
UnsupportedOperationException for datalake format {}, return null as lakeSource
to disable reading from lake source.",
- dataLake);
- return null;
+ throw new UnsupportedOperationException(
Review Comment:
```suggestion
throw new UnsupportedOperationException(
String.format("Table using '%s' data lake format cannot be used as
historical data in Fluss.", dataLake));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]