JingsongLi commented on code in PR #8117:
URL: https://github.com/apache/paimon/pull/8117#discussion_r3384831983
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java:
##########
@@ -200,6 +201,7 @@ private ReadBuilder createReadBuilder(@Nullable
org.apache.paimon.types.RowType
if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
+ ScanBucketUtils.applyScanBucket(table, readBuilder, conf);
Review Comment:
This applies scan.bucket only to the normal source read builder. Aggregate
pushdown plans splits through AggregatePushDownUtils.planSplits(...) with a
separate ReadBuilder, so queries such as SELECT COUNT(*) FROM T /*+
OPTIONS(scan.bucket=0) */ can still aggregate all buckets while non-aggregate
reads scan only the requested bucket. Please either apply SCAN_BUCKET in the
aggregate pushdown planning path as well, or disable aggregate pushdown when
scan.bucket is set.
##########
paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java:
##########
@@ -161,10 +164,42 @@ public ReadBuilder withRowRangeIndex(RowRangeIndex
rowRangeIndex) {
@Override
public ReadBuilder withBucket(int bucket) {
+ validateSpecifiedBucket(table, bucket);
this.specifiedBucket = bucket;
return this;
}
+ /**
+ * Validates bucket id before manifest pruning ({@link
InnerTableScan#withBucket(int)}). Callers
+ * such as Flink {@code scan.bucket} should route through {@link
#withBucket(int)}.
+ */
+ static void validateSpecifiedBucket(InnerTable table, int bucket) {
+ checkArgument(bucket >= 0, "Bucket id must be non-negative, but is
%s.", bucket);
+ if (!(table instanceof FileStoreTable)) {
+ throw new IllegalArgumentException(
+ "Bucket scan is only supported for FileStoreTable, but got
"
+ + table.getClass().getName());
+ }
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ checkArgument(
+ fileStoreTable.bucketMode() == BucketMode.HASH_FIXED,
+ "Bucket scan is only supported for fixed-bucket tables, but
got bucket mode %s.",
+ fileStoreTable.bucketMode());
+ checkArgument(
+ !fileStoreTable.schema().primaryKeys().isEmpty(),
Review Comment:
This makes the public core ReadBuilder.withBucket(...) API reject
fixed-bucket append tables, even though bucket-level manifest pruning is useful
and valid there too. If the primary-key restriction is only meant for the Flink
scan.bucket option, could we keep ReadBuilder.withBucket(...) generic and
enforce the Flink option restriction in the Flink scan.bucket path instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]