MgjLLL commented on code in PR #8136: URL: https://github.com/apache/paimon/pull/8136#discussion_r3394107057
########## paimon-python/pypaimon/common/predicate_json_parser.py: ########## @@ -0,0 +1,290 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import json +import re +from typing import Callable + +import pyarrow as pa +import pyarrow.compute as pc + + +def parse_predicate_to_batch_filter(json_str: str) -> Callable[[pa.RecordBatch], pa.Array]: + data = json.loads(json_str) + return _build_filter(data) + + +def _build_filter(data: dict) -> Callable[[pa.RecordBatch], pa.Array]: + kind = data["kind"] + if kind == "LEAF": + return _build_leaf_filter(data) + elif kind == "COMPOUND": + return _build_compound_filter(data) + raise ValueError(f"Unknown predicate kind: {kind}") + + +def _build_leaf_filter(data: dict) -> Callable: + transform = data["transform"] + function = data["function"] + literals = data.get("literals", []) + + def filter_fn(batch: pa.RecordBatch) -> pa.Array: + value_array = _apply_predicate_transform(transform, batch) + return _apply_leaf_function(function, value_array, literals, len(batch)) + + return filter_fn + + +def _build_compound_filter(data: dict) -> Callable: + function = data["function"] + child_filters = [_build_filter(child) for child in data["children"]] + + def filter_fn(batch: pa.RecordBatch) -> pa.Array: + if function == "AND": + result = child_filters[0](batch) + for cf in child_filters[1:]: + result = pc.and_(result, cf(batch)) + return result + elif function == "OR": + result = child_filters[0](batch) + for cf in child_filters[1:]: + result = pc.or_(result, cf(batch)) + return result + raise ValueError(f"Unknown compound function: {function}") + + return filter_fn + + +def _apply_predicate_transform(transform: dict, batch: pa.RecordBatch) -> pa.Array: + name = transform["name"] + + if name == "FIELD_REF": + return batch.column(transform["fieldRef"]["name"]) + + elif name == "CAST": + col = batch.column(transform["fieldRef"]["name"]) + target_type = _paimon_type_to_arrow(transform["type"]) + return pc.cast(col, target_type) + + elif name == "UPPER": + input_col = _resolve_transform_input(transform["inputs"][0], batch) + return pc.utf8_upper(input_col) + + elif name == "LOWER": + input_col = _resolve_transform_input(transform["inputs"][0], batch) + return pc.utf8_lower(input_col) + + elif name == "CONCAT": + resolved = [_resolve_transform_input(inp, batch) for inp in transform["inputs"]] + if not resolved: + return pa.nulls(len(batch), type=pa.string()) + return pc.binary_join_element_wise(*resolved, "") + + elif name == "CONCAT_WS": + sep = _resolve_transform_input(transform["inputs"][0], batch) + values = [_resolve_transform_input(inp, batch) for inp in transform["inputs"][1:]] + if not values: + return pa.nulls(len(batch), type=pa.string()) + return pc.binary_join_element_wise(*values, sep, null_handling='skip') + + elif name == "NULL": + return pa.nulls(len(batch), type=pa.bool_()) + + raise ValueError(f"Unknown transform type in predicate: {name}") + + +def _resolve_transform_input(inp, batch: pa.RecordBatch) -> pa.Array: + if isinstance(inp, dict): + return batch.column(inp["name"]) + elif isinstance(inp, str): + return pa.array([inp] * len(batch), type=pa.string()) + elif inp is None: + return pa.nulls(len(batch), type=pa.string()) + return pa.array([str(inp)] * len(batch), type=pa.string()) + + +def _apply_leaf_function(function: str, value_array: pa.Array, literals: list, batch_len: int) -> pa.Array: + converted = [_convert_literal(lit, value_array.type) for lit in literals] + + if function == "EQUAL": + return pc.equal(value_array, converted[0]) + elif function == "NOT_EQUAL": + return pc.not_equal(value_array, converted[0]) + elif function == "LESS_THAN": + return pc.less(value_array, converted[0]) + elif function == "LESS_OR_EQUAL": + return pc.less_equal(value_array, converted[0]) + elif function == "GREATER_THAN": + return pc.greater(value_array, converted[0]) + elif function == "GREATER_OR_EQUAL": + return pc.greater_equal(value_array, converted[0]) + elif function == "IS_NULL": + return pc.is_null(value_array) + elif function == "IS_NOT_NULL": + return pc.is_valid(value_array) + elif function == "IN": + return pc.is_in(value_array, pa.array(converted, type=value_array.type)) + elif function == "NOT_IN": + return pc.invert(pc.is_in(value_array, pa.array(converted, type=value_array.type))) + elif function == "BETWEEN": + return pc.and_(pc.greater_equal(value_array, converted[0]), + pc.less_equal(value_array, converted[1])) + elif function == "NOT_BETWEEN": + return pc.or_(pc.less(value_array, converted[0]), + pc.greater(value_array, converted[1])) + elif function == "STARTS_WITH": + return pc.starts_with(value_array, converted[0]) + elif function == "ENDS_WITH": + return pc.ends_with(value_array, converted[0]) + elif function == "CONTAINS": + return pc.match_substring(value_array, converted[0]) + elif function == "LIKE": + raw = literals[0] + escaped = re.escape(raw) + pattern = escaped.replace("%", ".*").replace("_", ".") Review Comment: Ported Like.sqlToRegexLike: backslash is the default escape character, \\% / \\_ are literal, invalid escape sequences raise. LIKE admin\_% now matches admin_foo (not adminXfoo), so policy predicates evaluate identically to the JVM client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
