Copilot commented on code in PR #2794: URL: https://github.com/apache/fluss/pull/2794#discussion_r2887267122
########## fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompositeBatchScanner}. */ +class CompositeBatchScannerTest { + + private static final Duration TIMEOUT = Duration.ofMillis(10); + + // ------------------------------------------------------------------------- + // No-limit tests + // ------------------------------------------------------------------------- + + @Test + void testPollBatchWithNoLimit() throws IOException { + // Three scanners each holding rows [0], [1], [2]. + // CompositeBatchScanner should round-robin and eventually return all rows. + StubBatchScanner s1 = scanner(0); + StubBatchScanner s2 = scanner(1); + StubBatchScanner s3 = scanner(2); + + CompositeBatchScanner composite = + new CompositeBatchScanner(Arrays.asList(s1, s2, s3), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(collected).hasSize(3); + assertThat(intValues(collected)).containsExactlyInAnyOrder(0, 1, 2); + assertThat(s1.closed).isTrue(); + assertThat(s2.closed).isTrue(); + assertThat(s3.closed).isTrue(); + } + + @Test + void testPollBatchDrainsMultipleBatchesPerScanner() throws IOException { + // Two scanners, each returning two single-row batches before exhausting. + StubBatchScanner s1 = scanner(10, 11); + StubBatchScanner s2 = scanner(20, 21); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(intValues(collected)).containsExactlyInAnyOrder(10, 11, 20, 21); + } + + @Test + void testPollBatchWithEmptyScannerList() throws IOException { + CompositeBatchScanner composite = new CompositeBatchScanner(Collections.emptyList(), null); + + assertThat(composite.pollBatch(TIMEOUT)).isNull(); + } + + @Test + void testPollBatchSkipsExhaustedScanner() throws IOException { + // s1 is already exhausted (returns null immediately), s2 has data. + StubBatchScanner s1 = scanner(); // no rows → immediately returns null + StubBatchScanner s2 = scanner(99); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(intValues(collected)).containsExactly(99); + assertThat(s1.closed).isTrue(); + assertThat(s2.closed).isTrue(); + } + + // ------------------------------------------------------------------------- + // Limit tests + // ------------------------------------------------------------------------- + + @Test + void testPollBatchWithLimitReturnsAtMostLimitRows() throws IOException { + // Two scanners with 3 rows each (one row per batch), limit = 3. + // collectLimitedRows collects until rows.size() >= limit. + StubBatchScanner s1 = scanner(1, 2, 3); + StubBatchScanner s2 = scanner(4, 5, 6); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), 3); + + CloseableIterator<InternalRow> batch = composite.pollBatch(TIMEOUT); + assertThat(batch).isNotNull(); + + List<Integer> values = new ArrayList<>(); + while (batch.hasNext()) { + values.add(batch.next().getInt(0)); + } + assertThat(values.size()).isGreaterThanOrEqualTo(3); + } + + @Test + void testPollBatchWithLimitExceedingTotalRows() throws IOException { + // limit > total available rows → all rows returned. + StubBatchScanner s1 = scanner(1, 2); + StubBatchScanner s2 = scanner(3); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), 100); + + CloseableIterator<InternalRow> batch = composite.pollBatch(TIMEOUT); + assertThat(batch).isNotNull(); + + List<Integer> values = new ArrayList<>(); + while (batch.hasNext()) { + values.add(batch.next().getInt(0)); + } + assertThat(values).containsExactlyInAnyOrder(1, 2, 3); + } Review Comment: The CloseableIterator returned by pollBatch is not closed in this test. Even if the current stub iterator is in-memory, production iterators may hold resources; closing them in tests helps enforce correct usage patterns. ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectLimitedRows; + +/** + * A {@link BatchScanner} that combines multiple {@link BatchScanner} instances into a single + * scanner. It iterates through all underlying scanners in order, delegating each {@link + * #pollBatch(Duration)} call to the next available scanner until all are exhausted. + * + * <p>When a {@code limit} is specified, rows are collected eagerly across all scanners up to that + * limit and returned in a single batch. + */ +@Internal +public class CompositeBatchScanner implements BatchScanner { + + /** Queue of underlying scanners to be polled in order. */ + private final LinkedList<BatchScanner> scannerQueue; + + /** Optional row limit; when set, rows are collected eagerly up to this count. */ + private final @Nullable Integer limit; + + public CompositeBatchScanner(List<BatchScanner> scanners, @Nullable Integer limit) { + this.scannerQueue = new LinkedList<>(scanners); + this.limit = limit; + } + + @Override + public void close() throws IOException { + scannerQueue.forEach(IOUtils::closeQuietly); + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + + while (!scannerQueue.isEmpty()) { + // Direct return limit scan with is not so much data. + if (limit != null) { + CloseableIterator<InternalRow> iterator = + CloseableIterator.wrap(collectLimitedRows(scannerQueue, limit).iterator()); + scannerQueue.clear(); + return iterator; + } + + BatchScanner scanner = scannerQueue.poll(); + try { + CloseableIterator<InternalRow> iterator = scanner.pollBatch(timeout); + if (iterator != null) { + // If the scanner has more data, add it back to the queue + scannerQueue.add(scanner); + return iterator; + } else { + // Close the scanner if it has no more data, and not add it back to the queue + scanner.close(); + } + } catch (Exception e) { + throw new RuntimeException("Failed to collect rows", e); Review Comment: pollBatch declares throws IOException, but this catch block wraps all exceptions in a RuntimeException and doesn’t attempt to close the current scanner (or remaining scanners) on failure, which can leak resources and makes error handling inconsistent for callers. Prefer rethrowing as an IOException (or propagating IOException directly), and ensure all underlying scanners are closed when an exception occurs. ```suggestion // Ensure all scanners are closed on failure to avoid resource leaks IOUtils.closeQuietly(scanner); scannerQueue.forEach(IOUtils::closeQuietly); scannerQueue.clear(); if (e instanceof IOException) { throw (IOException) e; } if (e instanceof RuntimeException) { throw (RuntimeException) e; } throw new IOException("Failed to collect rows", e); ``` ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java: ########## @@ -29,17 +30,22 @@ import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; +import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** API for configuring and creating {@link LogScanner} and {@link BatchScanner}. */ public class TableScan implements Scan { + private static final int MAX_LIMIT_PUSHDOWN = 2048; Review Comment: MAX_LIMIT_PUSHDOWN is introduced but never used, which will likely fail compilation under common “unused field” checks and also misses the opportunity to enforce a bounded limit for client-side limit scans. Either remove it, or validate Scan.limit(rowNumber) against this maximum (and consider validating non-negative limits). ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectLimitedRows; + +/** + * A {@link BatchScanner} that combines multiple {@link BatchScanner} instances into a single + * scanner. It iterates through all underlying scanners in order, delegating each {@link + * #pollBatch(Duration)} call to the next available scanner until all are exhausted. + * + * <p>When a {@code limit} is specified, rows are collected eagerly across all scanners up to that + * limit and returned in a single batch. + */ +@Internal +public class CompositeBatchScanner implements BatchScanner { + + /** Queue of underlying scanners to be polled in order. */ + private final LinkedList<BatchScanner> scannerQueue; + + /** Optional row limit; when set, rows are collected eagerly up to this count. */ + private final @Nullable Integer limit; + + public CompositeBatchScanner(List<BatchScanner> scanners, @Nullable Integer limit) { + this.scannerQueue = new LinkedList<>(scanners); + this.limit = limit; + } + + @Override + public void close() throws IOException { + scannerQueue.forEach(IOUtils::closeQuietly); + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + + while (!scannerQueue.isEmpty()) { + // Direct return limit scan with is not so much data. + if (limit != null) { + CloseableIterator<InternalRow> iterator = + CloseableIterator.wrap(collectLimitedRows(scannerQueue, limit).iterator()); + scannerQueue.clear(); + return iterator; Review Comment: In limit mode, this returns the full result of collectLimitedRows(scannerQueue, limit) without trimming, so callers can receive more than the requested limit (collectLimitedRows stops once rows.size() >= limit, which may overshoot by the last batch). That breaks the usual contract of Scan.limit() meaning “at most N rows”. Consider trimming the collected list to limit before wrapping it, and/or updating BatchScanUtils.collectLimitedRows to never return more than limit. Also note that this branch ignores the pollBatch(timeout) argument because collectLimitedRows uses a hard-coded DEFAULT_POLL_TIMEOUT. ########## fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompositeBatchScanner}. */ +class CompositeBatchScannerTest { + + private static final Duration TIMEOUT = Duration.ofMillis(10); + + // ------------------------------------------------------------------------- + // No-limit tests + // ------------------------------------------------------------------------- + + @Test + void testPollBatchWithNoLimit() throws IOException { + // Three scanners each holding rows [0], [1], [2]. + // CompositeBatchScanner should round-robin and eventually return all rows. + StubBatchScanner s1 = scanner(0); + StubBatchScanner s2 = scanner(1); + StubBatchScanner s3 = scanner(2); + + CompositeBatchScanner composite = + new CompositeBatchScanner(Arrays.asList(s1, s2, s3), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(collected).hasSize(3); + assertThat(intValues(collected)).containsExactlyInAnyOrder(0, 1, 2); + assertThat(s1.closed).isTrue(); + assertThat(s2.closed).isTrue(); + assertThat(s3.closed).isTrue(); + } + + @Test + void testPollBatchDrainsMultipleBatchesPerScanner() throws IOException { + // Two scanners, each returning two single-row batches before exhausting. + StubBatchScanner s1 = scanner(10, 11); + StubBatchScanner s2 = scanner(20, 21); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(intValues(collected)).containsExactlyInAnyOrder(10, 11, 20, 21); + } + + @Test + void testPollBatchWithEmptyScannerList() throws IOException { + CompositeBatchScanner composite = new CompositeBatchScanner(Collections.emptyList(), null); + + assertThat(composite.pollBatch(TIMEOUT)).isNull(); + } + + @Test + void testPollBatchSkipsExhaustedScanner() throws IOException { + // s1 is already exhausted (returns null immediately), s2 has data. + StubBatchScanner s1 = scanner(); // no rows → immediately returns null + StubBatchScanner s2 = scanner(99); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(intValues(collected)).containsExactly(99); + assertThat(s1.closed).isTrue(); + assertThat(s2.closed).isTrue(); + } + + // ------------------------------------------------------------------------- + // Limit tests + // ------------------------------------------------------------------------- + + @Test + void testPollBatchWithLimitReturnsAtMostLimitRows() throws IOException { + // Two scanners with 3 rows each (one row per batch), limit = 3. + // collectLimitedRows collects until rows.size() >= limit. + StubBatchScanner s1 = scanner(1, 2, 3); + StubBatchScanner s2 = scanner(4, 5, 6); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), 3); + + CloseableIterator<InternalRow> batch = composite.pollBatch(TIMEOUT); + assertThat(batch).isNotNull(); + + List<Integer> values = new ArrayList<>(); + while (batch.hasNext()) { + values.add(batch.next().getInt(0)); + } + assertThat(values.size()).isGreaterThanOrEqualTo(3); + } Review Comment: The test name says “ReturnsAtMostLimitRows”, but the assertion checks values.size() >= 3, which doesn’t validate the intended behavior. If CompositeBatchScanner is supposed to respect Scan.limit(N), this should assert values.size() <= limit (and ideally exactly limit in this setup). Also, please close the returned CloseableIterator (batch.close()) to avoid leaking resources in the test. ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java: ########## @@ -171,4 +177,42 @@ public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) tableInfo.getTableConfig().getKvFormat(), conn.getOrCreateRemoteFileDownloader()); } + + @Override + public BatchScanner createBatchScanner() throws IOException { + int bucketCount = tableInfo.getNumBuckets(); + List<TableBucket> tableBuckets; + if (tableInfo.isPartitioned()) { + List<PartitionInfo> partitionInfos; Review Comment: This table-level createBatchScanner() implementation delegates to createBatchScanner(TableBucket) for each bucket, which currently throws UnsupportedOperationException when limit is not set. As a result, calling table.newScan().createBatchScanner() fails with a bucket-scoped message. Add an explicit guard at the start of this method (or adjust the API/Javadoc) so the failure mode is clear and intentional. ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java: ########## @@ -171,4 +177,42 @@ public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) tableInfo.getTableConfig().getKvFormat(), conn.getOrCreateRemoteFileDownloader()); } + + @Override + public BatchScanner createBatchScanner() throws IOException { + int bucketCount = tableInfo.getNumBuckets(); + List<TableBucket> tableBuckets; + if (tableInfo.isPartitioned()) { + List<PartitionInfo> partitionInfos; + try { + partitionInfos = conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get(); + } catch (Exception e) { + throw new IOException( + "Fail to list partition infos for table bucket " + tableInfo.getTablePath(), Review Comment: The error message says “Fail to list partition infos…”, which is grammatically incorrect and user-facing. Consider changing to “Failed to list partition infos for table …” (and optionally drop “bucket” here since the error is at table scope). ```suggestion "Failed to list partition infos for table " + tableInfo.getTablePath(), ``` ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java: ########## @@ -88,4 +89,11 @@ public interface Scan { * #limit(int)} and only support for Primary Key Tables. */ BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId); + + /** + * Creates a {@link BatchScanner} to read current data in the given table for this scan. + * + * <p>Note: this API doesn't support pre-configured with {@link #project}. + */ + BatchScanner createBatchScanner() throws IOException; Review Comment: The JavaDoc says this API doesn’t support pre-configuring with project(), but the implementation is used with project(...) (e.g., PushdownUtils.limitScan) and TableScan propagates projectedColumns into LimitBatchScanner. Please update the JavaDoc to reflect actual support (or enforce the restriction in code if project truly isn’t supported). ########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectLimitedRows; + +/** + * A {@link BatchScanner} that combines multiple {@link BatchScanner} instances into a single + * scanner. It iterates through all underlying scanners in order, delegating each {@link + * #pollBatch(Duration)} call to the next available scanner until all are exhausted. + * + * <p>When a {@code limit} is specified, rows are collected eagerly across all scanners up to that + * limit and returned in a single batch. + */ +@Internal +public class CompositeBatchScanner implements BatchScanner { + + /** Queue of underlying scanners to be polled in order. */ Review Comment: The class-level JavaDoc says the scanner “iterates through all underlying scanners in order”, but the implementation does round-robin by re-enqueuing scanners that still have data. Please update the JavaDoc to match the actual iteration behavior to avoid misleading API users. ```suggestion * scanner. It polls the underlying scanners in a round-robin fashion: each {@link * #pollBatch(Duration)} call is delegated to the next scanner in the queue, and scanners that * still have data are re-enqueued while exhausted scanners are closed and removed. * * <p>When a {@code limit} is specified, rows are collected eagerly across all underlying scanners * up to that limit and returned in a single batch. */ @Internal public class CompositeBatchScanner implements BatchScanner { /** Queue of underlying scanners used to implement round-robin polling. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
