This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d22760373d [python] Introduce where for Python CLI table read (#7389)
d22760373d is described below

commit d22760373dd531ae0a36638d867ca936e1711bda
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 10 15:01:53 2026 +0800

    [python] Introduce where for Python CLI table read (#7389)
    
    Read data from a Paimon table and display it in a tabular format.
    
    ```shell
    paimon table read mydb.users
    ```
    
    **Options:**
    
    - `--select, -s`: Select specific columns to read (comma-separated)
    - `--where, -w`: Filter condition in SQL-like syntax
    - `--limit, -l`: Maximum number of results to display (default: 100)
    
    **Examples:**
    
    ```shell
    # Read with limit
    paimon table read mydb.users -l 50
    
    # Read specific columns
    paimon table read mydb.users -s id,name,age
    
    # Filter with WHERE clause
    paimon table read mydb.users --where "age > 18"
    
    # Combine select, where, and limit
    paimon table read mydb.users -s id,name -w "age >= 20 AND city = 'Beijing'" 
-l 50
    ```
    
    **WHERE Operators**
    
    The `--where` option supports SQL-like filter expressions:
    
    | Operator | Example |
    |---|---|
    | `=`, `!=`, `<>` | `name = 'Alice'` |
    | `<`, `<=`, `>`, `>=` | `age > 18` |
    | `IS NULL`, `IS NOT NULL` | `deleted_at IS NULL` |
    | `IN (...)`, `NOT IN (...)` | `status IN ('active', 'pending')` |
    | `BETWEEN ... AND ...` | `age BETWEEN 20 AND 30` |
    | `LIKE` | `name LIKE 'A%'` |
    
    Multiple conditions can be combined with `AND` and `OR` (AND has higher
    precedence). Parentheses are supported for grouping:
    
    ```shell
    # AND condition
    paimon table read mydb.users -w "age >= 20 AND age <= 30"
    
    # OR condition
    paimon table read mydb.users -w "city = 'Beijing' OR city = 'Shanghai'"
    
    # Parenthesized grouping
    paimon table read mydb.users -w "(age > 18 OR name = 'Bob') AND city = 
'Beijing'"
    
    # IN list
    paimon table read mydb.users -w "city IN ('Beijing', 'Shanghai', 
'Hangzhou')"
    
    # BETWEEN
    paimon table read mydb.users -w "age BETWEEN 25 AND 35"
    
    # LIKE pattern
    paimon table read mydb.users -w "name LIKE 'A%'"
    
    # IS NULL / IS NOT NULL
    paimon table read mydb.users -w "email IS NOT NULL"
    ```
    
    Literal values are automatically cast to the appropriate Python type
    based on the table schema (e.g., `INT` fields cast to `int`, `DOUBLE` to
    `float`).
    
    Output:
    ```
     id    name  age      city
      1   Alice   25   Beijing
      2     Bob   30  Shanghai
      3 Charlie   35 Guangzhou
      4   David   28  Shenzhen
      5     Eve   32  Hangzhou
    ```
---
 docs/content/pypaimon/cli.md                      |  48 ++-
 paimon-python/pypaimon/cli/cli_table.py           |  53 ++-
 paimon-python/pypaimon/cli/where_parser.py        | 376 ++++++++++++++++++++
 paimon-python/pypaimon/tests/cli_table_test.py    | 132 +++++++
 paimon-python/pypaimon/tests/where_parser_test.py | 404 ++++++++++++++++++++++
 5 files changed, 1003 insertions(+), 10 deletions(-)

diff --git a/docs/content/pypaimon/cli.md b/docs/content/pypaimon/cli.md
index b29c1b39be..3588c7c974 100644
--- a/docs/content/pypaimon/cli.md
+++ b/docs/content/pypaimon/cli.md
@@ -83,6 +83,7 @@ paimon table read mydb.users
 **Options:**
 
 - `--select, -s`: Select specific columns to read (comma-separated)
+- `--where, -w`: Filter condition in SQL-like syntax
 - `--limit, -l`: Maximum number of results to display (default: 100)
 
 **Examples:**
@@ -94,10 +95,53 @@ paimon table read mydb.users -l 50
 # Read specific columns
 paimon table read mydb.users -s id,name,age
 
-# Combine select and limit
-paimon table read mydb.users -s id,name -l 50
+# Filter with WHERE clause
+paimon table read mydb.users --where "age > 18"
+
+# Combine select, where, and limit
+paimon table read mydb.users -s id,name -w "age >= 20 AND city = 'Beijing'" -l 
50
 ```
 
+**WHERE Operators**
+
+The `--where` option supports SQL-like filter expressions:
+
+| Operator | Example |
+|---|---|
+| `=`, `!=`, `<>` | `name = 'Alice'` |
+| `<`, `<=`, `>`, `>=` | `age > 18` |
+| `IS NULL`, `IS NOT NULL` | `deleted_at IS NULL` |
+| `IN (...)`, `NOT IN (...)` | `status IN ('active', 'pending')` |
+| `BETWEEN ... AND ...` | `age BETWEEN 20 AND 30` |
+| `LIKE` | `name LIKE 'A%'` |
+
+Multiple conditions can be combined with `AND` and `OR` (AND has higher 
precedence). Parentheses are supported for grouping:
+
+```shell
+# AND condition
+paimon table read mydb.users -w "age >= 20 AND age <= 30"
+
+# OR condition
+paimon table read mydb.users -w "city = 'Beijing' OR city = 'Shanghai'"
+
+# Parenthesized grouping
+paimon table read mydb.users -w "(age > 18 OR name = 'Bob') AND city = 
'Beijing'"
+
+# IN list
+paimon table read mydb.users -w "city IN ('Beijing', 'Shanghai', 'Hangzhou')"
+
+# BETWEEN
+paimon table read mydb.users -w "age BETWEEN 25 AND 35"
+
+# LIKE pattern
+paimon table read mydb.users -w "name LIKE 'A%'"
+
+# IS NULL / IS NOT NULL
+paimon table read mydb.users -w "email IS NOT NULL"
+```
+
+Literal values are automatically cast to the appropriate Python type based on 
the table schema (e.g., `INT` fields cast to `int`, `DOUBLE` to `float`).
+
 Output:
 ```
  id    name  age      city
diff --git a/paimon-python/pypaimon/cli/cli_table.py 
b/paimon-python/pypaimon/cli/cli_table.py
index 43e86488e4..38c8db1731 100644
--- a/paimon-python/pypaimon/cli/cli_table.py
+++ b/paimon-python/pypaimon/cli/cli_table.py
@@ -63,21 +63,46 @@ def cmd_table_read(args):
     # Build read pipeline
     read_builder = table.new_read_builder()
     
-    # Apply projection (select columns) if specified
+    available_fields = set(field.name for field in table.table_schema.fields)
+
+    # Parse select and where options
     select_columns = args.select
+    where_clause = args.where
+    user_columns = None
+    extra_where_columns = []
+
     if select_columns:
         # Parse column names (comma-separated)
-        columns = [col.strip() for col in select_columns.split(',')]
-        
+        user_columns = [col.strip() for col in select_columns.split(',')]
+
         # Validate that all columns exist in the table schema
-        available_fields = set(field.name for field in 
table.table_schema.fields)
-        invalid_columns = [col for col in columns if col not in 
available_fields]
-        
+        invalid_columns = [col for col in user_columns if col not in 
available_fields]
         if invalid_columns:
             print(f"Error: Column(s) {invalid_columns} do not exist in table 
'{table_identifier}'.", file=sys.stderr)
             sys.exit(1)
-        
-        read_builder = read_builder.with_projection(columns)
+
+    # When both select and where are specified, ensure where-referenced fields
+    # are included in the projection so the filter can work correctly.
+    if user_columns and where_clause:
+        from pypaimon.cli.where_parser import extract_fields_from_where
+        where_fields = extract_fields_from_where(where_clause, 
available_fields)
+        user_column_set = set(user_columns)
+        extra_where_columns = [f for f in where_fields if f not in 
user_column_set]
+        projection_columns = user_columns + extra_where_columns
+        read_builder = read_builder.with_projection(projection_columns)
+    elif user_columns:
+        read_builder = read_builder.with_projection(user_columns)
+
+    # Apply where filter if specified
+    if where_clause:
+        from pypaimon.cli.where_parser import parse_where_clause
+        try:
+            predicate = parse_where_clause(where_clause, 
table.table_schema.fields)
+            if predicate:
+                read_builder = read_builder.with_filter(predicate)
+        except ValueError as e:
+            print(f"Error: Invalid WHERE clause: {e}", file=sys.stderr)
+            sys.exit(1)
 
     # Apply limit if specified
     limit = args.limit
@@ -95,6 +120,11 @@ def cmd_table_read(args):
     df = read.to_pandas(splits)
     if limit and len(df) > limit:
         df = df.head(limit)
+
+    # Drop extra columns that were added only for where-clause filtering
+    if extra_where_columns:
+        df = df.drop(columns=extra_where_columns, errors='ignore')
+
     print(df.to_string(index=False))
 
 
@@ -550,6 +580,13 @@ def add_table_subcommands(table_parser):
         default=None,
         help='Select specific columns to read (comma-separated, e.g., 
"id,name,age")'
     )
+    read_parser.add_argument(
+        '--where', '-w',
+        type=str,
+        default=None,
+        help=('Filter condition in SQL-like syntax '
+              '(e.g., "age > 18", "name = \'Alice\' AND status IN (\'active\', 
\'pending\')")')
+    )
     read_parser.add_argument(
         '--limit', '-l',
         type=int,
diff --git a/paimon-python/pypaimon/cli/where_parser.py 
b/paimon-python/pypaimon/cli/where_parser.py
new file mode 100644
index 0000000000..89a2dbc547
--- /dev/null
+++ b/paimon-python/pypaimon/cli/where_parser.py
@@ -0,0 +1,376 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+SQL WHERE clause parser for Paimon CLI.
+
+Parses simple SQL-like WHERE expressions into Predicate objects.
+
+Supported operators:
+  =, !=, <>, <, <=, >, >=,
+  IS NULL, IS NOT NULL,
+  IN (...), NOT IN (...),
+  BETWEEN ... AND ...,
+  LIKE '...'
+
+Supported connectors: AND, OR (AND has higher precedence than OR).
+Parenthesized grouping is supported.
+
+Examples:
+  "age > 18"
+  "name = 'Alice' AND age >= 20"
+  "status IN ('active', 'pending')"
+  "score BETWEEN 60 AND 100"
+  "name LIKE 'A%'"
+  "deleted_at IS NULL"
+  "age > 18 OR (name = 'Bob' AND status = 'active')"
+"""
+
+import re
+from typing import Any, Dict, List, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.schema.data_types import AtomicType, DataField
+
+
+def extract_fields_from_where(where_string: str, available_fields: set) -> set:
+    """Extract all field names referenced in a WHERE clause.
+
+    Args:
+        where_string: The WHERE clause string.
+        available_fields: Set of valid field names from the table schema.
+
+    Returns:
+        A set of field names referenced in the WHERE clause.
+    """
+    if not where_string or not where_string.strip():
+        return set()
+
+    tokens = _tokenize(where_string.strip())
+    referenced_fields = set()
+    for token in tokens:
+        if token in available_fields:
+            referenced_fields.add(token)
+    return referenced_fields
+
+
+def parse_where_clause(where_string: str, fields: List[DataField]) -> 
Optional[Predicate]:
+    """Parse a SQL-like WHERE clause string into a Predicate.
+
+    Args:
+        where_string: The WHERE clause string (without the 'WHERE' keyword).
+        fields: The table schema fields for type resolution.
+
+    Returns:
+        A Predicate object, or None if the string is empty.
+
+    Raises:
+        ValueError: If the WHERE clause cannot be parsed.
+    """
+    where_string = where_string.strip()
+    if not where_string:
+        return None
+
+    field_type_map = _build_field_type_map(fields)
+    predicate_builder = PredicateBuilder(fields)
+    tokens = _tokenize(where_string)
+    predicate, remaining = _parse_or_expression(tokens, predicate_builder, 
field_type_map)
+
+    if remaining:
+        raise ValueError(
+            f"Unexpected tokens after parsing: {' '.join(remaining)}"
+        )
+
+    return predicate
+
+
+def _build_field_type_map(fields: List[DataField]) -> Dict[str, Optional[str]]:
+    """Build a mapping from field name to its base type string.
+
+    Only AtomicType fields are supported for WHERE filtering.
+    Non-atomic types (ARRAY, MAP, ROW, etc.) are mapped to None.
+    """
+    result = {}
+    for field in fields:
+        if isinstance(field.type, AtomicType):
+            result[field.name] = field.type.type.upper()
+        else:
+            result[field.name] = None
+    return result
+
+
+def _cast_literal(value_str: str, type_name: str) -> Any:
+    """Cast a literal string to the appropriate Python type based on the field 
type."""
+    integer_types = {'TINYINT', 'SMALLINT', 'INT', 'INTEGER', 'BIGINT'}
+    float_types = {'FLOAT', 'DOUBLE'}
+
+    base_type = type_name.split('(')[0].strip()
+
+    if base_type in integer_types:
+        return int(value_str)
+    if base_type in float_types:
+        return float(value_str)
+    if base_type.startswith('DECIMAL') or base_type in ('DECIMAL', 'NUMERIC', 
'DEC'):
+        return float(value_str)
+    if base_type == 'BOOLEAN':
+        return value_str.lower() in ('true', '1', 'yes')
+    return value_str
+
+
+_TOKEN_PATTERN = re.compile(
+    r"""
+      '(?:[^'\\]|\\.)*'       # single-quoted string
+    | "(?:[^"\\]|\\.)*"        # double-quoted string
+    | <=                       # <=
+    | >=                       # >=
+    | <>                       # <>
+    | !=                       # !=
+    | [=<>]                    # single-char operators
+    | [(),]                    # punctuation
+    | [^\s,()=<>!'"]+          # unquoted word / number
+    """,
+    re.VERBOSE,
+)
+
+
+def _tokenize(expression: str) -> List[str]:
+    """Tokenize a WHERE clause string."""
+    return _TOKEN_PATTERN.findall(expression)
+
+
+def _parse_or_expression(
+    tokens: List[str],
+    builder: PredicateBuilder,
+    type_map: Dict[str, str],
+) -> (Predicate, List[str]):
+    """Parse an OR expression (lowest precedence)."""
+    left, tokens = _parse_and_expression(tokens, builder, type_map)
+    or_operands = [left]
+
+    while tokens and tokens[0].upper() == 'OR':
+        tokens = tokens[1:]  # consume 'OR'
+        right, tokens = _parse_and_expression(tokens, builder, type_map)
+        or_operands.append(right)
+
+    if len(or_operands) == 1:
+        return or_operands[0], tokens
+    return PredicateBuilder.or_predicates(or_operands), tokens
+
+
+def _parse_and_expression(
+    tokens: List[str],
+    builder: PredicateBuilder,
+    type_map: Dict[str, str],
+) -> (Predicate, List[str]):
+    """Parse an AND expression."""
+    left, tokens = _parse_primary(tokens, builder, type_map)
+    and_operands = [left]
+
+    while tokens and tokens[0].upper() == 'AND':
+        # Distinguish 'AND' as connector vs. 'AND' in 'BETWEEN ... AND ...'
+        # BETWEEN's AND is consumed inside _parse_primary, so here it's always 
a connector.
+        tokens = tokens[1:]  # consume 'AND'
+        right, tokens = _parse_primary(tokens, builder, type_map)
+        and_operands.append(right)
+
+    if len(and_operands) == 1:
+        return and_operands[0], tokens
+    return PredicateBuilder.and_predicates(and_operands), tokens
+
+
+def _parse_primary(
+    tokens: List[str],
+    builder: PredicateBuilder,
+    type_map: Dict[str, str],
+) -> (Predicate, List[str]):
+    """Parse a primary expression: a single condition or a parenthesized 
group."""
+    if not tokens:
+        raise ValueError("Unexpected end of WHERE clause")
+
+    # Parenthesized group
+    if tokens[0] == '(':
+        tokens = tokens[1:]  # consume '('
+        predicate, tokens = _parse_or_expression(tokens, builder, type_map)
+        if not tokens or tokens[0] != ')':
+            raise ValueError("Missing closing parenthesis ')'")
+        tokens = tokens[1:]  # consume ')'
+        return predicate, tokens
+
+    # Must be a condition starting with a field name
+    field_name = tokens[0]
+    tokens = tokens[1:]
+
+    if not tokens:
+        raise ValueError(f"Unexpected end after field name '{field_name}'")
+
+    if field_name not in type_map:
+        raise ValueError(
+            f"Unknown field '{field_name}'. "
+            f"Available fields: {sorted(type_map.keys())}"
+        )
+
+    field_type = type_map[field_name]
+    if field_type is None:
+        raise ValueError(
+            f"Field '{field_name}' has a non-atomic type (e.g., ARRAY, MAP, 
ROW) "
+            f"which is not supported in WHERE clauses. "
+            f"Only atomic type fields (INT, STRING, DOUBLE, etc.) can be used 
for filtering."
+        )
+
+    operator_token = tokens[0].upper()
+
+    # IS NULL / IS NOT NULL
+    if operator_token == 'IS':
+        tokens = tokens[1:]  # consume 'IS'
+        if not tokens:
+            raise ValueError(f"Unexpected end after 'IS' for field 
'{field_name}'")
+        next_token = tokens[0].upper()
+        if next_token == 'NULL':
+            tokens = tokens[1:]
+            return builder.is_null(field_name), tokens
+        elif next_token == 'NOT':
+            tokens = tokens[1:]  # consume 'NOT'
+            if not tokens or tokens[0].upper() != 'NULL':
+                raise ValueError(f"Expected 'NULL' after 'IS NOT' for field 
'{field_name}'")
+            tokens = tokens[1:]  # consume 'NULL'
+            return builder.is_not_null(field_name), tokens
+        else:
+            raise ValueError(f"Expected 'NULL' or 'NOT NULL' after 'IS' for 
field '{field_name}'")
+
+    # NOT IN / NOT BETWEEN
+    if operator_token == 'NOT':
+        tokens = tokens[1:]  # consume 'NOT'
+        if not tokens:
+            raise ValueError(f"Expected 'IN' or 'BETWEEN' after 'NOT' for 
field '{field_name}'")
+        next_keyword = tokens[0].upper()
+        if next_keyword == 'IN':
+            tokens = tokens[1:]  # consume 'IN'
+            values, tokens = _parse_in_list(tokens, field_type)
+            return builder.is_not_in(field_name, values), tokens
+        elif next_keyword == 'BETWEEN':
+            tokens = tokens[1:]  # consume 'BETWEEN'
+            lower_str, tokens = _consume_literal(tokens)
+            lower_value = _cast_literal(lower_str, field_type)
+            if not tokens or tokens[0].upper() != 'AND':
+                raise ValueError(f"Expected 'AND' in NOT BETWEEN expression 
for field '{field_name}'")
+            tokens = tokens[1:]  # consume 'AND'
+            upper_str, tokens = _consume_literal(tokens)
+            upper_value = _cast_literal(upper_str, field_type)
+            return builder.not_between(field_name, lower_value, upper_value), 
tokens
+        else:
+            raise ValueError(f"Expected 'IN' or 'BETWEEN' after 'NOT' for 
field '{field_name}'")
+
+    # IN (...)
+    if operator_token == 'IN':
+        tokens = tokens[1:]  # consume 'IN'
+        values, tokens = _parse_in_list(tokens, field_type)
+        return builder.is_in(field_name, values), tokens
+
+    # BETWEEN ... AND ...
+    if operator_token == 'BETWEEN':
+        tokens = tokens[1:]  # consume 'BETWEEN'
+        lower_str, tokens = _consume_literal(tokens)
+        lower_value = _cast_literal(lower_str, field_type)
+        if not tokens or tokens[0].upper() != 'AND':
+            raise ValueError(f"Expected 'AND' in BETWEEN expression for field 
'{field_name}'")
+        tokens = tokens[1:]  # consume 'AND'
+        upper_str, tokens = _consume_literal(tokens)
+        upper_value = _cast_literal(upper_str, field_type)
+        return builder.between(field_name, lower_value, upper_value), tokens
+
+    # LIKE 'pattern'
+    if operator_token == 'LIKE':
+        tokens = tokens[1:]  # consume 'LIKE'
+        pattern_str, tokens = _consume_literal(tokens)
+        return builder.like(field_name, pattern_str), tokens
+
+    # Comparison operators: =, !=, <>, <, <=, >, >=
+    comparison_operators = {'=', '!=', '<>', '<', '<=', '>', '>='}
+    if operator_token in comparison_operators:
+        tokens = tokens[1:]  # consume operator
+        value_str, tokens = _consume_literal(tokens)
+        value = _cast_literal(value_str, field_type)
+        predicate = _build_comparison(builder, field_name, operator_token, 
value)
+        return predicate, tokens
+
+    raise ValueError(
+        f"Unsupported operator '{tokens[0]}' for field '{field_name}'. "
+        f"Supported: =, !=, <>, <, <=, >, >=, IS NULL, IS NOT NULL, IN, NOT 
IN, BETWEEN, LIKE"
+    )
+
+
+def _build_comparison(
+    builder: PredicateBuilder,
+    field_name: str,
+    operator: str,
+    value: Any,
+) -> Predicate:
+    """Build a comparison predicate."""
+    if operator == '=':
+        return builder.equal(field_name, value)
+    elif operator in ('!=', '<>'):
+        return builder.not_equal(field_name, value)
+    elif operator == '<':
+        return builder.less_than(field_name, value)
+    elif operator == '<=':
+        return builder.less_or_equal(field_name, value)
+    elif operator == '>':
+        return builder.greater_than(field_name, value)
+    elif operator == '>=':
+        return builder.greater_or_equal(field_name, value)
+    else:
+        raise ValueError(f"Unknown comparison operator: {operator}")
+
+
+def _parse_in_list(tokens: List[str], field_type: str) -> (List[Any], 
List[str]):
+    """Parse an IN list: (val1, val2, ...)."""
+    if not tokens or tokens[0] != '(':
+        raise ValueError("Expected '(' after IN")
+    tokens = tokens[1:]  # consume '('
+
+    values = []
+    while tokens:
+        if tokens[0] == ')':
+            tokens = tokens[1:]  # consume ')'
+            return values, tokens
+        if tokens[0] == ',':
+            tokens = tokens[1:]  # consume ','
+            continue
+        value_str, tokens = _consume_literal(tokens)
+        values.append(_cast_literal(value_str, field_type))
+
+    raise ValueError("Missing closing ')' in IN list")
+
+
+def _consume_literal(tokens: List[str]) -> (str, List[str]):
+    """Consume a single literal value from the token stream.
+
+    Handles quoted strings (strips quotes) and unquoted values.
+    """
+    if not tokens:
+        raise ValueError("Expected a literal value but reached end of 
expression")
+
+    token = tokens[0]
+    tokens = tokens[1:]
+
+    # Strip surrounding quotes from string literals
+    if (token.startswith("'") and token.endswith("'")) or \
+       (token.startswith('"') and token.endswith('"')):
+        return token[1:-1], tokens
+
+    return token, tokens
diff --git a/paimon-python/pypaimon/tests/cli_table_test.py 
b/paimon-python/pypaimon/tests/cli_table_test.py
index 43469a6692..cbde5d6a2c 100644
--- a/paimon-python/pypaimon/tests/cli_table_test.py
+++ b/paimon-python/pypaimon/tests/cli_table_test.py
@@ -932,6 +932,138 @@ class CliTableTest(unittest.TestCase):
         after_id_idx = field_names.index('after_id_col')
         self.assertEqual(after_id_idx, id_idx + 1)
 
+    def test_cli_table_read_with_where_equal(self):
+        """Test table read with --where equal filter via CLI."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users', '--where', "name = 
'Alice'"]):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                self.assertIn('Alice', output)
+                self.assertNotIn('Bob', output)
+                self.assertNotIn('Charlie', output)
+
+    def test_cli_table_read_with_where_greater_than(self):
+        """Test table read with --where greater-than filter via CLI."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users', '-w', 'age > 30']):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                # age > 30: Charlie(35), Eve(32)
+                self.assertIn('Charlie', output)
+                self.assertIn('Eve', output)
+                self.assertNotIn('Alice', output)
+                self.assertNotIn('Bob', output)
+
+    def test_cli_table_read_with_where_and(self):
+        """Test table read with --where AND condition via CLI."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users', '--where', 'age >= 28 
AND age <= 32']):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                # age >= 28 AND age <= 32: Bob(30), David(28), Eve(32)
+                self.assertIn('Bob', output)
+                self.assertIn('David', output)
+                self.assertIn('Eve', output)
+                self.assertNotIn('Alice', output)
+                self.assertNotIn('Charlie', output)
+
+    def test_cli_table_read_with_where_in(self):
+        """Test table read with --where IN filter via CLI."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users',
+                    '--where', "city IN ('Beijing', 'Shanghai')"]):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                self.assertIn('Alice', output)
+                self.assertIn('Bob', output)
+                self.assertNotIn('Charlie', output)
+
+    def test_cli_table_read_with_where_and_select(self):
+        """Test table read with both --where and --select via CLI."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users',
+                    '--select', 'name,age',
+                    '--where', 'age > 30']):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                self.assertIn('Charlie', output)
+                self.assertIn('Eve', output)
+                self.assertNotIn('Alice', output)
+
+    def test_cli_table_read_where_field_not_in_select(self):
+        """Test that where filter works even when the filtered field is not in 
select."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users',
+                    '--select', 'name,city',
+                    '--where', 'age > 30']):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                # age > 30: Charlie(35,Guangzhou), Eve(32,Hangzhou)
+                # Filter should work even though 'age' is not in select
+                self.assertIn('Charlie', output)
+                self.assertIn('Eve', output)
+                self.assertIn('Guangzhou', output)
+                self.assertIn('Hangzhou', output)
+                # Excluded rows should not appear
+                self.assertNotIn('Alice', output)
+                self.assertNotIn('Bob', output)
+                self.assertNotIn('David', output)
+                # The 'age' column should NOT appear in output (it was only 
needed for filtering)
+                self.assertNotIn(' 35', output)
+                self.assertNotIn(' 32', output)
+                self.assertNotIn(' 25', output)
+
+    def test_cli_table_read_with_invalid_where(self):
+        """Test table read with invalid --where clause via CLI."""
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'read', 'test_db.users',
+                    '--where', 'age INVALID 30']):
+            with patch('sys.stderr', new_callable=StringIO) as mock_stderr:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                error_output = mock_stderr.getvalue()
+                self.assertIn('Error', error_output)
+
     def test_cli_table_rename_basic(self):
         """Test basic table rename via CLI."""
         import json
diff --git a/paimon-python/pypaimon/tests/where_parser_test.py 
b/paimon-python/pypaimon/tests/where_parser_test.py
new file mode 100644
index 0000000000..2a8ce02930
--- /dev/null
+++ b/paimon-python/pypaimon/tests/where_parser_test.py
@@ -0,0 +1,404 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import unittest
+
+from pypaimon.cli.where_parser import parse_where_clause, _tokenize, 
_cast_literal
+from pypaimon.schema.data_types import ArrayType, AtomicType, DataField
+
+
+class WhereParserTokenizeTest(unittest.TestCase):
+    """Tests for the tokenizer."""
+
+    def test_simple_comparison(self):
+        tokens = _tokenize("age > 18")
+        self.assertEqual(tokens, ['age', '>', '18'])
+
+    def test_string_literal(self):
+        tokens = _tokenize("name = 'Alice'")
+        self.assertEqual(tokens, ['name', '=', "'Alice'"])
+
+    def test_double_quoted_string(self):
+        tokens = _tokenize('city = "Beijing"')
+        self.assertEqual(tokens, ['city', '=', '"Beijing"'])
+
+    def test_multi_char_operators(self):
+        tokens = _tokenize("age >= 18 AND age <= 30")
+        self.assertEqual(tokens, ['age', '>=', '18', 'AND', 'age', '<=', '30'])
+
+    def test_not_equal_operators(self):
+        tokens = _tokenize("status != 'active'")
+        self.assertEqual(tokens, ['status', '!=', "'active'"])
+
+        tokens = _tokenize("status <> 'active'")
+        self.assertEqual(tokens, ['status', '<>', "'active'"])
+
+    def test_in_list(self):
+        tokens = _tokenize("id IN (1, 2, 3)")
+        self.assertEqual(tokens, ['id', 'IN', '(', '1', ',', '2', ',', '3', 
')'])
+
+    def test_parenthesized_group(self):
+        tokens = _tokenize("(age > 18 OR name = 'Bob')")
+        self.assertEqual(tokens, ['(', 'age', '>', '18', 'OR', 'name', '=', 
"'Bob'", ')'])
+
+    def test_between(self):
+        tokens = _tokenize("score BETWEEN 60 AND 100")
+        self.assertEqual(tokens, ['score', 'BETWEEN', '60', 'AND', '100'])
+
+    def test_is_null(self):
+        tokens = _tokenize("deleted_at IS NULL")
+        self.assertEqual(tokens, ['deleted_at', 'IS', 'NULL'])
+
+    def test_is_not_null(self):
+        tokens = _tokenize("name IS NOT NULL")
+        self.assertEqual(tokens, ['name', 'IS', 'NOT', 'NULL'])
+
+
+class WhereParserCastLiteralTest(unittest.TestCase):
+    """Tests for literal type casting."""
+
+    def test_cast_int(self):
+        self.assertEqual(_cast_literal('42', 'INT'), 42)
+        self.assertEqual(_cast_literal('42', 'INTEGER'), 42)
+        self.assertEqual(_cast_literal('42', 'BIGINT'), 42)
+        self.assertEqual(_cast_literal('42', 'TINYINT'), 42)
+        self.assertEqual(_cast_literal('42', 'SMALLINT'), 42)
+
+    def test_cast_float(self):
+        self.assertAlmostEqual(_cast_literal('3.14', 'FLOAT'), 3.14)
+        self.assertAlmostEqual(_cast_literal('3.14', 'DOUBLE'), 3.14)
+
+    def test_cast_decimal(self):
+        self.assertAlmostEqual(_cast_literal('99.99', 'DECIMAL(10,2)'), 99.99)
+
+    def test_cast_boolean(self):
+        self.assertTrue(_cast_literal('true', 'BOOLEAN'))
+        self.assertTrue(_cast_literal('True', 'BOOLEAN'))
+        self.assertTrue(_cast_literal('1', 'BOOLEAN'))
+        self.assertFalse(_cast_literal('false', 'BOOLEAN'))
+        self.assertFalse(_cast_literal('0', 'BOOLEAN'))
+
+    def test_cast_string(self):
+        self.assertEqual(_cast_literal('hello', 'STRING'), 'hello')
+        self.assertEqual(_cast_literal('hello', 'VARCHAR(100)'), 'hello')
+
+
+class WhereParserParseTest(unittest.TestCase):
+    """Tests for the full WHERE clause parser."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.fields = [
+            DataField(0, 'id', AtomicType('INT')),
+            DataField(1, 'name', AtomicType('STRING')),
+            DataField(2, 'age', AtomicType('INT')),
+            DataField(3, 'score', AtomicType('DOUBLE')),
+            DataField(4, 'city', AtomicType('STRING')),
+            DataField(5, 'active', AtomicType('BOOLEAN')),
+        ]
+
+    def test_empty_string(self):
+        result = parse_where_clause('', self.fields)
+        self.assertIsNone(result)
+
+    def test_whitespace_only(self):
+        result = parse_where_clause('   ', self.fields)
+        self.assertIsNone(result)
+
+    def test_equal_int(self):
+        predicate = parse_where_clause("id = 1", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'equal')
+        self.assertEqual(predicate.field, 'id')
+        self.assertEqual(predicate.literals, [1])
+
+    def test_equal_string(self):
+        predicate = parse_where_clause("name = 'Alice'", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'equal')
+        self.assertEqual(predicate.field, 'name')
+        self.assertEqual(predicate.literals, ['Alice'])
+
+    def test_not_equal(self):
+        predicate = parse_where_clause("id != 5", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'notEqual')
+        self.assertEqual(predicate.field, 'id')
+        self.assertEqual(predicate.literals, [5])
+
+    def test_not_equal_diamond(self):
+        predicate = parse_where_clause("id <> 5", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'notEqual')
+        self.assertEqual(predicate.literals, [5])
+
+    def test_less_than(self):
+        predicate = parse_where_clause("age < 30", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'lessThan')
+        self.assertEqual(predicate.field, 'age')
+        self.assertEqual(predicate.literals, [30])
+
+    def test_less_or_equal(self):
+        predicate = parse_where_clause("age <= 30", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'lessOrEqual')
+        self.assertEqual(predicate.literals, [30])
+
+    def test_greater_than(self):
+        predicate = parse_where_clause("age > 18", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'greaterThan')
+        self.assertEqual(predicate.field, 'age')
+        self.assertEqual(predicate.literals, [18])
+
+    def test_greater_or_equal(self):
+        predicate = parse_where_clause("age >= 18", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'greaterOrEqual')
+        self.assertEqual(predicate.literals, [18])
+
+    def test_is_null(self):
+        predicate = parse_where_clause("city IS NULL", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'isNull')
+        self.assertEqual(predicate.field, 'city')
+
+    def test_is_not_null(self):
+        predicate = parse_where_clause("name IS NOT NULL", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'isNotNull')
+        self.assertEqual(predicate.field, 'name')
+
+    def test_in_int_list(self):
+        predicate = parse_where_clause("id IN (1, 2, 3)", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'in')
+        self.assertEqual(predicate.field, 'id')
+        self.assertEqual(predicate.literals, [1, 2, 3])
+
+    def test_in_string_list(self):
+        predicate = parse_where_clause("city IN ('Beijing', 'Shanghai')", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'in')
+        self.assertEqual(predicate.field, 'city')
+        self.assertEqual(predicate.literals, ['Beijing', 'Shanghai'])
+
+    def test_not_in(self):
+        predicate = parse_where_clause("id NOT IN (4, 5)", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'notIn')
+        self.assertEqual(predicate.field, 'id')
+        self.assertEqual(predicate.literals, [4, 5])
+
+    def test_between(self):
+        predicate = parse_where_clause("age BETWEEN 20 AND 30", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'between')
+        self.assertEqual(predicate.field, 'age')
+        self.assertEqual(predicate.literals, [20, 30])
+
+    def test_between_float(self):
+        predicate = parse_where_clause("score BETWEEN 60.0 AND 100.0", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'between')
+        self.assertEqual(predicate.field, 'score')
+        self.assertAlmostEqual(predicate.literals[0], 60.0)
+        self.assertAlmostEqual(predicate.literals[1], 100.0)
+
+    def test_not_between(self):
+        predicate = parse_where_clause("age NOT BETWEEN 20 AND 30", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'notBetween')
+        self.assertEqual(predicate.field, 'age')
+        self.assertEqual(predicate.literals, [20, 30])
+
+    def test_not_between_float(self):
+        predicate = parse_where_clause("score NOT BETWEEN 60.0 AND 100.0", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'notBetween')
+        self.assertEqual(predicate.field, 'score')
+        self.assertAlmostEqual(predicate.literals[0], 60.0)
+        self.assertAlmostEqual(predicate.literals[1], 100.0)
+
+    def test_not_between_case_insensitive(self):
+        predicate = parse_where_clause("age not between 20 and 30", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'notBetween')
+        self.assertEqual(predicate.literals, [20, 30])
+
+    def test_not_between_with_and_connector(self):
+        predicate = parse_where_clause(
+            "age NOT BETWEEN 20 AND 30 AND name = 'Alice'", self.fields
+        )
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'and')
+        self.assertEqual(predicate.literals[0].method, 'notBetween')
+        self.assertEqual(predicate.literals[0].field, 'age')
+        self.assertEqual(predicate.literals[0].literals, [20, 30])
+        self.assertEqual(predicate.literals[1].method, 'equal')
+        self.assertEqual(predicate.literals[1].field, 'name')
+
+    def test_error_not_between_missing_and(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("age NOT BETWEEN 20 30", self.fields)
+
+    def test_like(self):
+        predicate = parse_where_clause("name LIKE 'A%'", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'like')
+        self.assertEqual(predicate.field, 'name')
+        self.assertEqual(predicate.literals, ['A%'])
+
+    def test_and_connector(self):
+        predicate = parse_where_clause("age > 18 AND name = 'Alice'", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'and')
+        self.assertEqual(len(predicate.literals), 2)
+        self.assertEqual(predicate.literals[0].method, 'greaterThan')
+        self.assertEqual(predicate.literals[0].field, 'age')
+        self.assertEqual(predicate.literals[1].method, 'equal')
+        self.assertEqual(predicate.literals[1].field, 'name')
+
+    def test_or_connector(self):
+        predicate = parse_where_clause("age < 20 OR age > 30", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'or')
+        self.assertEqual(len(predicate.literals), 2)
+        self.assertEqual(predicate.literals[0].method, 'lessThan')
+        self.assertEqual(predicate.literals[1].method, 'greaterThan')
+
+    def test_and_has_higher_precedence_than_or(self):
+        # "a = 1 OR b = 2 AND c = 3" should be parsed as "a = 1 OR (b = 2 AND 
c = 3)"
+        predicate = parse_where_clause("id = 1 OR age = 25 AND name = 
'Alice'", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'or')
+        self.assertEqual(len(predicate.literals), 2)
+        self.assertEqual(predicate.literals[0].method, 'equal')
+        self.assertEqual(predicate.literals[0].field, 'id')
+        self.assertEqual(predicate.literals[1].method, 'and')
+        self.assertEqual(len(predicate.literals[1].literals), 2)
+
+    def test_parenthesized_group(self):
+        predicate = parse_where_clause("(id = 1 OR id = 2) AND age > 18", 
self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'and')
+        self.assertEqual(len(predicate.literals), 2)
+        self.assertEqual(predicate.literals[0].method, 'or')
+        self.assertEqual(predicate.literals[1].method, 'greaterThan')
+
+    def test_nested_parentheses(self):
+        predicate = parse_where_clause(
+            "(age > 18 AND (name = 'Alice' OR name = 'Bob'))", self.fields
+        )
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'and')
+        self.assertEqual(predicate.literals[0].method, 'greaterThan')
+        self.assertEqual(predicate.literals[1].method, 'or')
+
+    def test_multiple_and_conditions(self):
+        predicate = parse_where_clause(
+            "id > 0 AND age >= 20 AND name IS NOT NULL", self.fields
+        )
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'and')
+        self.assertEqual(len(predicate.literals), 3)
+
+    def test_complex_expression(self):
+        predicate = parse_where_clause(
+            "age > 18 OR (name = 'Bob' AND city IN ('Beijing', 'Shanghai'))",
+            self.fields
+        )
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'or')
+        self.assertEqual(predicate.literals[0].method, 'greaterThan')
+        and_part = predicate.literals[1]
+        self.assertEqual(and_part.method, 'and')
+        self.assertEqual(and_part.literals[0].method, 'equal')
+        self.assertEqual(and_part.literals[1].method, 'in')
+
+    def test_case_insensitive_keywords(self):
+        predicate = parse_where_clause("age between 20 and 30", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'between')
+
+        predicate = parse_where_clause("name is null", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'isNull')
+
+        predicate = parse_where_clause("name is not null", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'isNotNull')
+
+        predicate = parse_where_clause("id in (1, 2)", self.fields)
+        self.assertIsNotNone(predicate)
+        self.assertEqual(predicate.method, 'in')
+
+    def test_type_casting_int_field(self):
+        predicate = parse_where_clause("age = 25", self.fields)
+        self.assertIsInstance(predicate.literals[0], int)
+
+    def test_type_casting_double_field(self):
+        predicate = parse_where_clause("score > 3.14", self.fields)
+        self.assertIsInstance(predicate.literals[0], float)
+
+    def test_type_casting_string_field(self):
+        predicate = parse_where_clause("name = 'Alice'", self.fields)
+        self.assertIsInstance(predicate.literals[0], str)
+
+    def test_error_missing_closing_paren(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("(age > 18", self.fields)
+
+    def test_error_unexpected_end(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("age", self.fields)
+
+    def test_error_missing_in_paren(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("id IN 1, 2, 3", self.fields)
+
+    def test_error_between_missing_and(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("age BETWEEN 20 30", self.fields)
+
+    def test_error_unexpected_trailing_tokens(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("age > 18 extra_token", self.fields)
+
+    def test_error_is_without_null(self):
+        with self.assertRaises(ValueError):
+            parse_where_clause("age IS SOMETHING", self.fields)
+
+    def test_error_unknown_field(self):
+        with self.assertRaises(ValueError) as context:
+            parse_where_clause("nonexistent = 1", self.fields)
+        self.assertIn("Unknown field", str(context.exception))
+        self.assertIn("nonexistent", str(context.exception))
+
+    def test_error_non_atomic_type_field(self):
+        fields_with_array = self.fields + [
+            DataField(6, 'tags', ArrayType(True, AtomicType('STRING'))),
+        ]
+        with self.assertRaises(ValueError) as context:
+            parse_where_clause("tags = 'foo'", fields_with_array)
+        self.assertIn("non-atomic type", str(context.exception))
+        self.assertIn("tags", str(context.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()


Reply via email to