This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new aac417c1a1 [flink] Fix Flink Lookup Join for Postpone bucket table (#5537) aac417c1a1 is described below commit aac417c1a149bafdc182114fba5ab20770d36c17 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Apr 25 21:28:13 2025 +0800 [flink] Fix Flink Lookup Join for Postpone bucket table (#5537) --- .../apache/paimon/flink/action/CompactAction.java | 2 +- .../flink/lookup/FixedBucketFromPkExtractor.java | 110 ------------------ .../flink/lookup/PrimaryKeyPartialLookupTable.java | 100 ++++++++++++++--- .../paimon/flink/source/BaseDataTableSource.java | 6 +- .../paimon/flink/PostponeBucketTableITCase.java | 125 +++++++++++++++++++++ .../flink/lookup/FileStoreLookupFunctionTest.java | 23 ++++ 6 files changed, 241 insertions(+), 125 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 306216a57d..0244a20547 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -259,7 +259,7 @@ public class CompactAction extends TableActionBase { InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer( fileStoreTable.coreOptions().partitionDefaultName(), - fileStoreTable.rowType(), + fileStoreTable.store().partitionType(), fileStoreTable.partitionKeys().toArray(new String[0]), fileStoreTable.coreOptions().legacyPartitionName()); String commitUser = CoreOptions.createCommitUser(options); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java deleted file mode 100644 index 61617988a6..0000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.lookup; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.codegen.CodeGenUtils; -import org.apache.paimon.codegen.Projection; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.KeyAndBucketExtractor; - -import static org.apache.paimon.utils.Preconditions.checkArgument; - -/** Extractor to extract bucket from the primary key. */ -public class FixedBucketFromPkExtractor implements KeyAndBucketExtractor<InternalRow> { - - private transient InternalRow primaryKey; - - private final boolean sameBucketKeyAndTrimmedPrimaryKey; - - private final int numBuckets; - - private final Projection bucketKeyProjection; - - private final Projection trimmedPrimaryKeyProjection; - - private final Projection partitionProjection; - - private final Projection logPrimaryKeyProjection; - - public FixedBucketFromPkExtractor(TableSchema schema) { - this.numBuckets = new CoreOptions(schema.options()).bucket(); - checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets); - this.sameBucketKeyAndTrimmedPrimaryKey = - schema.bucketKeys().equals(schema.trimmedPrimaryKeys()); - this.bucketKeyProjection = - CodeGenUtils.newProjection( - schema.logicalPrimaryKeysType(), - schema.bucketKeys().stream() - .mapToInt(schema.primaryKeys()::indexOf) - .toArray()); - this.trimmedPrimaryKeyProjection = - CodeGenUtils.newProjection( - schema.logicalPrimaryKeysType(), - schema.trimmedPrimaryKeys().stream() - .mapToInt(schema.primaryKeys()::indexOf) - .toArray()); - this.partitionProjection = - CodeGenUtils.newProjection( - schema.logicalPrimaryKeysType(), - schema.partitionKeys().stream() - .mapToInt(schema.primaryKeys()::indexOf) - .toArray()); - this.logPrimaryKeyProjection = - CodeGenUtils.newProjection( - schema.logicalRowType(), schema.projection(schema.primaryKeys())); - } - - @Override - public void setRecord(InternalRow record) { - this.primaryKey = record; - } - - @Override - public BinaryRow partition() { - return partitionProjection.apply(primaryKey); - } - - private BinaryRow bucketKey() { - if (sameBucketKeyAndTrimmedPrimaryKey) { - return trimmedPrimaryKey(); - } - - return bucketKeyProjection.apply(primaryKey); - } - - @Override - public int bucket() { - BinaryRow bucketKey = bucketKey(); - return KeyAndBucketExtractor.bucket( - KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets); - } - - @Override - public BinaryRow trimmedPrimaryKey() { - return trimmedPrimaryKeyProjection.apply(primaryKey); - } - - @Override - public BinaryRow logPrimaryKey() { - return logPrimaryKeyProjection.apply(primaryKey); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 255351767c..24fd87a213 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -19,15 +19,19 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.Projection; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.flink.query.RemoteTableQuery; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.KeyAndBucketExtractor; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; @@ -43,14 +47,18 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Lookup table for primary key which supports to read the LSM tree directly. */ public class PrimaryKeyPartialLookupTable implements LookupTable { private final QueryExecutorFactory executorFactory; - private final FixedBucketFromPkExtractor extractor; @Nullable private final ProjectedRow keyRearrange; @Nullable private final ProjectedRow trimmedKeyRearrange; @@ -58,16 +66,30 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { @Nullable private Filter<InternalRow> cacheRowFilter; private QueryExecutor queryExecutor; + private final Projection partitionFromPk; + private final Projection bucketKeyFromPk; + private PrimaryKeyPartialLookupTable( QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) { this.executorFactory = executorFactory; - if (table.bucketMode() != BucketMode.HASH_FIXED) { throw new UnsupportedOperationException( "Unsupported mode for partial lookup: " + table.bucketMode()); } - this.extractor = new FixedBucketFromPkExtractor(table.schema()); + TableSchema schema = table.schema(); + this.partitionFromPk = + CodeGenUtils.newProjection( + schema.logicalPrimaryKeysType(), + schema.partitionKeys().stream() + .mapToInt(schema.primaryKeys()::indexOf) + .toArray()); + this.bucketKeyFromPk = + CodeGenUtils.newProjection( + schema.logicalPrimaryKeysType(), + schema.bucketKeys().stream() + .mapToInt(schema.primaryKeys()::indexOf) + .toArray()); ProjectedRow keyRearrange = null; if (!table.primaryKeys().equals(joinKey)) { @@ -80,7 +102,7 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { } this.keyRearrange = keyRearrange; - List<String> trimmedPrimaryKeys = table.schema().trimmedPrimaryKeys(); + List<String> trimmedPrimaryKeys = schema.trimmedPrimaryKeys(); ProjectedRow trimmedKeyRearrange = null; if (!trimmedPrimaryKeys.equals(joinKey)) { trimmedKeyRearrange = @@ -115,9 +137,14 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { if (keyRearrange != null) { adjustedKey = keyRearrange.replaceRow(adjustedKey); } - extractor.setRecord(adjustedKey); - int bucket = extractor.bucket(); - BinaryRow partition = extractor.partition(); + + BinaryRow partition = partitionFromPk.apply(adjustedKey); + Integer numBuckets = queryExecutor.numBuckets(partition); + if (numBuckets == null) { + // no data, just return none + return Collections.emptyList(); + } + int bucket = bucket(numBuckets, adjustedKey); InternalRow trimmedKey = key; if (trimmedKeyRearrange != null) { @@ -132,6 +159,12 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { } } + private int bucket(int numBuckets, InternalRow primaryKey) { + BinaryRow bucketKey = bucketKeyFromPk.apply(primaryKey); + return KeyAndBucketExtractor.bucket( + KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets); + } + @Override public void refresh() { queryExecutor.refresh(); @@ -182,6 +215,9 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { interface QueryExecutor extends Closeable { + @Nullable + Integer numBuckets(BinaryRow partition); + InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException; void refresh(); @@ -195,6 +231,9 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { private final StreamTableScan scan; private final String tableName; + private final Integer defaultNumBuckets; + private final Map<BinaryRow, Integer> numBuckets; + private LocalQueryExecutor( FileStoreTable table, int[] projection, @@ -222,6 +261,14 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { .newStreamScan(); this.tableName = table.name(); + this.defaultNumBuckets = table.bucketSpec().getNumBuckets(); + this.numBuckets = new HashMap<>(); + } + + @Override + @Nullable + public Integer numBuckets(BinaryRow partition) { + return numBuckets.get(partition); } @Override @@ -241,16 +288,30 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { } for (Split split : splits) { - BinaryRow partition = ((DataSplit) split).partition(); - int bucket = ((DataSplit) split).bucket(); - List<DataFileMeta> before = ((DataSplit) split).beforeFiles(); - List<DataFileMeta> after = ((DataSplit) split).dataFiles(); - - tableQuery.refreshFiles(partition, bucket, before, after); + refreshSplit((DataSplit) split); } } } + @VisibleForTesting + void refreshSplit(DataSplit split) { + BinaryRow partition = split.partition(); + int bucket = split.bucket(); + List<DataFileMeta> before = split.beforeFiles(); + List<DataFileMeta> after = split.dataFiles(); + + tableQuery.refreshFiles(partition, bucket, before, after); + Integer totalBuckets = split.totalBuckets(); + if (totalBuckets == null) { + // Just for compatibility with older versions + checkArgument( + defaultNumBuckets > 0, + "This is a bug, old version table numBuckets should be greater than 0."); + totalBuckets = defaultNumBuckets; + } + numBuckets.put(partition, totalBuckets); + } + @Override public void close() throws IOException { tableQuery.close(); @@ -273,9 +334,22 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { static class RemoteQueryExecutor implements QueryExecutor { private final RemoteTableQuery tableQuery; + private final Integer numBuckets; private RemoteQueryExecutor(FileStoreTable table, int[] projection) { this.tableQuery = new RemoteTableQuery(table).withValueProjection(projection); + int numBuckets = table.bucketSpec().getNumBuckets(); + if (numBuckets == POSTPONE_BUCKET) { + throw new UnsupportedOperationException( + "Remote query does not support POSTPONE_BUCKET."); + } + this.numBuckets = numBuckets; + } + + @Override + @Nullable + public Integer numBuckets(BinaryRow partition) { + return numBuckets; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 81cced9a74..ca80129a51 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -38,6 +38,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.BucketSpec; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -86,6 +87,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; +import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** @@ -409,7 +411,9 @@ public abstract class BaseDataTableSource extends FlinkTableSource private boolean supportBucketShufflePartitioner( List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) { - return BucketMode.HASH_FIXED.equals(((FileStoreTable) table).bucketMode()) + BucketSpec bucketSpec = ((FileStoreTable) table).bucketSpec(); + return bucketSpec.getBucketMode() == BucketMode.HASH_FIXED + && bucketSpec.getNumBuckets() != POSTPONE_BUCKET && new HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java index 3519f162d0..db87abc7ad 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java @@ -425,6 +425,131 @@ public class PostponeBucketTableITCase extends AbstractTestBase { .containsExactlyInAnyOrder("+I[5]"); } + @Timeout(TIMEOUT) + @Test + public void testLookupPostponeBucketTable() throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment bEnv = + tableEnvironmentBuilder() + .batchMode() + .setConf(TableConfigOptions.TABLE_DML_SYNC, true) + .build(); + + String createCatalogSql = + "CREATE CATALOG mycat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"; + bEnv.executeSql(createCatalogSql); + bEnv.executeSql("USE CATALOG mycat"); + bEnv.executeSql( + "CREATE TABLE T (\n" + + " k INT,\n" + + " v INT,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ") WITH (\n" + + " 'bucket' = '-2'\n" + + ")"); + bEnv.executeSql("CREATE TABLE SRC (i INT, `proctime` AS PROCTIME())"); + + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .parallelism(1) + .checkpointIntervalMs(200) + .build(); + sEnv.executeSql(createCatalogSql); + sEnv.executeSql("USE CATALOG mycat"); + TableResult streamingSelect = + sEnv.executeSql( + "SELECT i, v FROM SRC LEFT JOIN T " + + "FOR SYSTEM_TIME AS OF SRC.proctime AS D ON SRC.i = D.k"); + + JobClient client = streamingSelect.getJobClient().get(); + CloseableIterator<Row> it = streamingSelect.collect(); + + bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30)").await(); + bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + + // lookup join + bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await(); + assertThat(collect(client, it, 3)) + .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3, 30]"); + + // rescale and re-join + bEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` => 5)").await(); + bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await(); + assertThat(collect(client, it, 3)) + .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3, 30]"); + + it.close(); + } + + @Timeout(TIMEOUT) + @Test + public void testLookupPostponeBucketPartitionedTable() throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment bEnv = + tableEnvironmentBuilder() + .batchMode() + .setConf(TableConfigOptions.TABLE_DML_SYNC, true) + .build(); + + String createCatalogSql = + "CREATE CATALOG mycat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"; + bEnv.executeSql(createCatalogSql); + bEnv.executeSql("USE CATALOG mycat"); + bEnv.executeSql( + "CREATE TABLE T (\n" + + " k INT,\n" + + " pt INT,\n" + + " v INT,\n" + + " PRIMARY KEY (k, pt) NOT ENFORCED\n" + + ") PARTITIONED BY (pt) WITH (\n" + + " 'bucket' = '-2'\n" + + ")"); + bEnv.executeSql("CREATE TABLE SRC (i INT, pt INT, `proctime` AS PROCTIME())"); + + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .parallelism(1) + .checkpointIntervalMs(200) + .build(); + sEnv.executeSql(createCatalogSql); + sEnv.executeSql("USE CATALOG mycat"); + TableResult streamingSelect = + sEnv.executeSql( + "SELECT i, D.pt, v FROM SRC LEFT JOIN T " + + "FOR SYSTEM_TIME AS OF SRC.proctime AS D ON SRC.i = D.k AND SRC.pt = D.pt"); + + JobClient client = streamingSelect.getJobClient().get(); + CloseableIterator<Row> it = streamingSelect.collect(); + + bEnv.executeSql("INSERT INTO T VALUES (1, 1, 10), (2, 2, 20), (3, 2, 30)").await(); + bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + + // rescale for partitions to different num buckets and lookup join + bEnv.executeSql( + "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 5, `partition` => 'pt=1')") + .await(); + bEnv.executeSql( + "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 8, `partition` => 'pt=2')") + .await(); + bEnv.executeSql("INSERT INTO SRC VALUES (1, 1), (2, 2), (3, 2)").await(); + assertThat(collect(client, it, 3)) + .containsExactlyInAnyOrder("+I[1, 1, 10]", "+I[2, 2, 20]", "+I[3, 2, 30]"); + + it.close(); + } + private List<String> collect(TableResult result) throws Exception { List<String> ret = new ArrayList<>(); try (CloseableIterator<Row> it = result.collect()) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java index dbf9f99a32..dcbc405d31 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -49,6 +50,7 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.file.Path; import java.time.Duration; @@ -62,6 +64,7 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST; import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; @@ -148,6 +151,26 @@ public class FileStoreLookupFunctionTest { } } + @Test + public void testCompatibilityForOldVersion() throws Exception { + createLookupFunction(false, true, false, false); + commit(writeCommit(1)); + PrimaryKeyPartialLookupTable lookupTable = + (PrimaryKeyPartialLookupTable) lookupFunction.lookupTable(); + LocalQueryExecutor queryExecutor = (LocalQueryExecutor) lookupTable.queryExecutor(); + + // set totalBuckets to null, for testing old version + DataSplit split = (DataSplit) table.newReadBuilder().newScan().plan().splits().get(0); + Field field = DataSplit.class.getDeclaredField("totalBuckets"); + field.setAccessible(true); + field.set(split, null); + assertThat(split.totalBuckets()).isNull(); + + // assert num buckets should be 2 + queryExecutor.refreshSplit(split); + assertThat(queryExecutor.numBuckets(EMPTY_ROW)).isEqualTo(2); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {