JingsongLi commented on code in PR #8117:
URL: https://github.com/apache/paimon/pull/8117#discussion_r3384831983


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java:
##########
@@ -200,6 +201,7 @@ private ReadBuilder createReadBuilder(@Nullable 
org.apache.paimon.types.RowType
         if (limit != null) {
             readBuilder.withLimit(limit.intValue());
         }
+        ScanBucketUtils.applyScanBucket(table, readBuilder, conf);

Review Comment:
   This applies scan.bucket only to the normal source read builder. Aggregate 
pushdown plans splits through AggregatePushDownUtils.planSplits(...) with a 
separate ReadBuilder, so queries such as SELECT COUNT(*) FROM T /*+ 
OPTIONS(scan.bucket=0) */ can still aggregate all buckets while non-aggregate 
reads scan only the requested bucket. Please either apply SCAN_BUCKET in the 
aggregate pushdown planning path as well, or disable aggregate pushdown when 
scan.bucket is set.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java:
##########
@@ -161,10 +164,42 @@ public ReadBuilder withRowRangeIndex(RowRangeIndex 
rowRangeIndex) {
 
     @Override
     public ReadBuilder withBucket(int bucket) {
+        validateSpecifiedBucket(table, bucket);
         this.specifiedBucket = bucket;
         return this;
     }
 
+    /**
+     * Validates bucket id before manifest pruning ({@link 
InnerTableScan#withBucket(int)}). Callers
+     * such as Flink {@code scan.bucket} should route through {@link 
#withBucket(int)}.
+     */
+    static void validateSpecifiedBucket(InnerTable table, int bucket) {
+        checkArgument(bucket >= 0, "Bucket id must be non-negative, but is 
%s.", bucket);
+        if (!(table instanceof FileStoreTable)) {
+            throw new IllegalArgumentException(
+                    "Bucket scan is only supported for FileStoreTable, but got 
"
+                            + table.getClass().getName());
+        }
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        checkArgument(
+                fileStoreTable.bucketMode() == BucketMode.HASH_FIXED,
+                "Bucket scan is only supported for fixed-bucket tables, but 
got bucket mode %s.",
+                fileStoreTable.bucketMode());
+        checkArgument(
+                !fileStoreTable.schema().primaryKeys().isEmpty(),

Review Comment:
   This makes the public core ReadBuilder.withBucket(...) API reject 
fixed-bucket append tables, even though bucket-level manifest pruning is useful 
and valid there too. If the primary-key restriction is only meant for the Flink 
scan.bucket option, could we keep ReadBuilder.withBucket(...) generic and 
enforce the Flink option restriction in the Flink scan.bucket path instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to