This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 44f7061e2e [python] Fix wrong result when using where and limit in CLI
table read (#7391)
44f7061e2e is described below
commit 44f7061e2edea34c3f07617965eb3d5e509464dc
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 10 16:43:37 2026 +0800
[python] Fix wrong result when using where and limit in CLI table read
(#7391)
---
paimon-python/pypaimon/cli/cli_table.py | 26 ++++++++++---
paimon-python/pypaimon/tests/cli_table_test.py | 54 ++++++++++++++++++++++++++
2 files changed, 74 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/cli/cli_table.py
b/paimon-python/pypaimon/cli/cli_table.py
index 38c8db1731..9c68ff653e 100644
--- a/paimon-python/pypaimon/cli/cli_table.py
+++ b/paimon-python/pypaimon/cli/cli_table.py
@@ -104,9 +104,11 @@ def cmd_table_read(args):
print(f"Error: Invalid WHERE clause: {e}", file=sys.stderr)
sys.exit(1)
- # Apply limit if specified
+ # Apply limit: only push down when there is no where clause,
+ # because limit push-down may stop reading before enough rows
+ # pass the filter, leading to fewer results than expected.
limit = args.limit
- if limit:
+ if limit and not where_clause:
read_builder = read_builder.with_limit(limit)
# Scan and read
@@ -116,10 +118,22 @@ def cmd_table_read(args):
read = read_builder.new_read()
- # Use pandas to display as a nice table
- df = read.to_pandas(splits)
- if limit and len(df) > limit:
- df = df.head(limit)
+ # Read splits incrementally, stopping early when limit is reached
+ if limit:
+ import pandas as pd
+ collected_rows = 0
+ table_list = []
+ for split in splits:
+ if collected_rows >= limit:
+ break
+ partial_df = read.to_pandas([split])
+ collected_rows += len(partial_df)
+ table_list.append(partial_df)
+ df = pd.concat(table_list, ignore_index=True) if table_list else
read.to_pandas([])
+ if len(df) > limit:
+ df = df.head(limit)
+ else:
+ df = read.to_pandas(splits)
# Drop extra columns that were added only for where-clause filtering
if extra_where_columns:
diff --git a/paimon-python/pypaimon/tests/cli_table_test.py
b/paimon-python/pypaimon/tests/cli_table_test.py
index cbde5d6a2c..dfe6f7e431 100644
--- a/paimon-python/pypaimon/tests/cli_table_test.py
+++ b/paimon-python/pypaimon/tests/cli_table_test.py
@@ -1049,6 +1049,60 @@ class CliTableTest(unittest.TestCase):
self.assertNotIn(' 32', output)
self.assertNotIn(' 25', output)
+ def test_cli_table_read_with_where_and_limit(self):
+ """Test that where + limit returns correct filtered results without
limit push-down.
+
+ Writes data in two batches to produce multiple splits, so that limit
+ push-down would actually take effect and potentially miss matching rows
+ in later splits.
+ """
+
+ # Create a dedicated table for this test with two batches of data
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('val', pa.string()),
+ ('score', pa.int32()),
+ ])
+ # Important: multiple splits are required for the limit to take effect
+ schema = Schema.from_pyarrow_schema(pa_schema,
options={'source.split.target-size': '1b'})
+ self.catalog.create_table('test_db.limit_test', schema, True)
+ table = self.catalog.get_table('test_db.limit_test')
+
+ def write_batch():
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ batch = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'val': ['a', 'b', 'c'],
+ 'score': [10, 20, 30],
+ }, schema=pa_schema)
+ table_write.write_arrow(batch)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ write_batch()
+ write_batch()
+ write_batch()
+
+ with patch('sys.argv',
+ ['paimon', '-c', self.config_file,
+ 'table', 'read', 'test_db.limit_test',
+ '--where', 'score = 20',
+ '--limit', '2']):
+ with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+ try:
+ main()
+ except SystemExit:
+ pass
+
+ output = mock_stdout.getvalue()
+ lines = [line for line in output.strip().split('\n') if
line.strip()]
+ self.assertEqual(len(lines), 3)
+ self.assertNotIn(' a ', output)
+ self.assertNotIn(' c ', output)
+
def test_cli_table_read_with_invalid_where(self):
"""Test table read with invalid --where clause via CLI."""
with patch('sys.argv',