JingsongLi commented on code in PR #8136: URL: https://github.com/apache/paimon/pull/8136#discussion_r3380850757
########## paimon-python/pypaimon/catalog/table_query_auth.py: ########## @@ -0,0 +1,87 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Callable, Dict, List, Optional + +import pyarrow as pa +import pyarrow.compute as pc + +from pypaimon.common.predicate_json_parser import ( + extract_referenced_fields, + parse_predicate_to_batch_filter, +) +from pypaimon.schema.data_types import DataField + + +class TableNoPermissionException(Exception): + MSG = "Table %s has no permission. Cause by %s." + + def __init__(self, identifier, cause=None): + cause_msg = str(cause) if cause else "" + super().__init__(self.MSG % (identifier, cause_msg)) + self.identifier = identifier + self.__cause__ = cause + + +class TableQueryAuthResult: + + def __init__(self, filter: Optional[List[str]], column_masking: Optional[Dict[str, str]]): + self.filter = filter + self.column_masking = column_masking + + def convert_plan(self, plan): + from pypaimon.read.query_auth_split import QueryAuthSplit + from pypaimon.read.plan import Plan + + if not self.filter and not self.column_masking: + return plan + auth_splits = [QueryAuthSplit(split, self) for split in plan.splits()] + return Plan(auth_splits) Review Comment: Java `TableScan.Plan` does not carry a snapshot id, but Python `Plan` does and the update / row-id update paths use it as `check_from_snapshot`. Wrapping the plan here drops `plan.snapshot_id`, so a query-auth table planned from a non-empty snapshot becomes `snapshot_id=None`; `table_update` then emits commit messages with `-1`, which disables the row-id conflict checks (and related global-index update checks). Please preserve the original plan metadata, e.g. `Plan(auth_splits, snapshot_id=plan.snapshot_id)`. ########## paimon-python/pypaimon/common/predicate_json_parser.py: ########## @@ -0,0 +1,290 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import json +import re +from typing import Callable + +import pyarrow as pa +import pyarrow.compute as pc + + +def parse_predicate_to_batch_filter(json_str: str) -> Callable[[pa.RecordBatch], pa.Array]: + data = json.loads(json_str) + return _build_filter(data) + + +def _build_filter(data: dict) -> Callable[[pa.RecordBatch], pa.Array]: + kind = data["kind"] + if kind == "LEAF": + return _build_leaf_filter(data) + elif kind == "COMPOUND": + return _build_compound_filter(data) + raise ValueError(f"Unknown predicate kind: {kind}") + + +def _build_leaf_filter(data: dict) -> Callable: + transform = data["transform"] + function = data["function"] + literals = data.get("literals", []) + + def filter_fn(batch: pa.RecordBatch) -> pa.Array: + value_array = _apply_predicate_transform(transform, batch) + return _apply_leaf_function(function, value_array, literals, len(batch)) + + return filter_fn + + +def _build_compound_filter(data: dict) -> Callable: + function = data["function"] + child_filters = [_build_filter(child) for child in data["children"]] + + def filter_fn(batch: pa.RecordBatch) -> pa.Array: + if function == "AND": + result = child_filters[0](batch) + for cf in child_filters[1:]: + result = pc.and_(result, cf(batch)) + return result + elif function == "OR": + result = child_filters[0](batch) + for cf in child_filters[1:]: + result = pc.or_(result, cf(batch)) + return result + raise ValueError(f"Unknown compound function: {function}") + + return filter_fn + + +def _apply_predicate_transform(transform: dict, batch: pa.RecordBatch) -> pa.Array: + name = transform["name"] + + if name == "FIELD_REF": + return batch.column(transform["fieldRef"]["name"]) + + elif name == "CAST": + col = batch.column(transform["fieldRef"]["name"]) + target_type = _paimon_type_to_arrow(transform["type"]) + return pc.cast(col, target_type) + + elif name == "UPPER": + input_col = _resolve_transform_input(transform["inputs"][0], batch) + return pc.utf8_upper(input_col) + + elif name == "LOWER": + input_col = _resolve_transform_input(transform["inputs"][0], batch) + return pc.utf8_lower(input_col) + + elif name == "CONCAT": + resolved = [_resolve_transform_input(inp, batch) for inp in transform["inputs"]] + if not resolved: + return pa.nulls(len(batch), type=pa.string()) + return pc.binary_join_element_wise(*resolved, "") + + elif name == "CONCAT_WS": + sep = _resolve_transform_input(transform["inputs"][0], batch) + values = [_resolve_transform_input(inp, batch) for inp in transform["inputs"][1:]] + if not values: + return pa.nulls(len(batch), type=pa.string()) + return pc.binary_join_element_wise(*values, sep, null_handling='skip') + + elif name == "NULL": + return pa.nulls(len(batch), type=pa.bool_()) + + raise ValueError(f"Unknown transform type in predicate: {name}") + + +def _resolve_transform_input(inp, batch: pa.RecordBatch) -> pa.Array: + if isinstance(inp, dict): + return batch.column(inp["name"]) + elif isinstance(inp, str): + return pa.array([inp] * len(batch), type=pa.string()) + elif inp is None: + return pa.nulls(len(batch), type=pa.string()) + return pa.array([str(inp)] * len(batch), type=pa.string()) + + +def _apply_leaf_function(function: str, value_array: pa.Array, literals: list, batch_len: int) -> pa.Array: + converted = [_convert_literal(lit, value_array.type) for lit in literals] + + if function == "EQUAL": + return pc.equal(value_array, converted[0]) + elif function == "NOT_EQUAL": + return pc.not_equal(value_array, converted[0]) + elif function == "LESS_THAN": + return pc.less(value_array, converted[0]) + elif function == "LESS_OR_EQUAL": + return pc.less_equal(value_array, converted[0]) + elif function == "GREATER_THAN": + return pc.greater(value_array, converted[0]) + elif function == "GREATER_OR_EQUAL": + return pc.greater_equal(value_array, converted[0]) + elif function == "IS_NULL": + return pc.is_null(value_array) + elif function == "IS_NOT_NULL": + return pc.is_valid(value_array) + elif function == "IN": + return pc.is_in(value_array, pa.array(converted, type=value_array.type)) + elif function == "NOT_IN": + return pc.invert(pc.is_in(value_array, pa.array(converted, type=value_array.type))) + elif function == "BETWEEN": + return pc.and_(pc.greater_equal(value_array, converted[0]), + pc.less_equal(value_array, converted[1])) + elif function == "NOT_BETWEEN": + return pc.or_(pc.less(value_array, converted[0]), + pc.greater(value_array, converted[1])) + elif function == "STARTS_WITH": + return pc.starts_with(value_array, converted[0]) + elif function == "ENDS_WITH": + return pc.ends_with(value_array, converted[0]) + elif function == "CONTAINS": + return pc.match_substring(value_array, converted[0]) + elif function == "LIKE": + raw = literals[0] + escaped = re.escape(raw) + pattern = escaped.replace("%", ".*").replace("_", ".") Review Comment: This does not match the JVM `LIKE` semantics. Java treats backslash as the default escape character before expanding `%` / `_`, so a policy predicate like `LIKE admin\\_%` matches `admin_foo` and not `adminXfoo`. Escaping the whole string first and then replacing every `%` / `_` makes escaped wildcards behave as wildcards (or requires a literal backslash), so Python can allow/deny different rows from the Java client for the same auth filter. Please port the Java `Like.sqlToRegexLike` behavior, including invalid escape handling. ########## paimon-python/pypaimon/read/table_read.py: ########## @@ -611,6 +611,66 @@ def _widen_to_top_level_for_merge(self) -> List[DataField]: widened.append(field) return widened + def _create_reader_for_split(self, split): + from pypaimon.read.query_auth_split import QueryAuthSplit + + auth_result = None + if isinstance(split, QueryAuthSplit): + auth_result = split.auth_result + split = split.split + + if auth_result is not None: + return self._authed_reader(split, auth_result) + else: + return self._create_split_read(split).create_reader() + + def _authed_reader(self, split, auth_result): + from pypaimon.read.reader.auth_masking_reader import ( + AuthFilterReader, AuthMaskingReader, ColumnProjectReader) + + table_fields = self.table.fields + read_fields = self.read_type + + extra_fields = auth_result.get_extra_fields_for_filter(read_fields, table_fields) + effective_read_type = read_fields + if extra_fields: + effective_read_type = read_fields + extra_fields + + reader = self._create_split_read_with_read_type(split, effective_read_type).create_reader() + + if not isinstance(reader, RecordBatchReader): + from pypaimon.read.reader.auth_masking_reader import RecordReaderToBatchAdapter + schema = PyarrowFieldParser.from_paimon_schema(effective_read_type) + reader = RecordReaderToBatchAdapter(reader, schema) + + filter_fn = auth_result.extract_row_filter() + if filter_fn: + reader = AuthFilterReader(reader, filter_fn) + + if auth_result.column_masking: + reader = AuthMaskingReader(reader, auth_result.column_masking, effective_read_type) + + if extra_fields: + original_columns = [f.name for f in read_fields] + reader = ColumnProjectReader(reader, original_columns) + + return reader + + def _create_split_read_with_read_type(self, split, read_type): Review Comment: This auth-specific construction bypasses the normal PK read path above. In `_create_split_read`, PK tables inject missing `sequence.field` columns into the inner read type and then project them back out, matching the Java `withReadType` + outer projection behavior. Here, if query auth is enabled and the user projects `id,val` from a PK table with `sequence.field=ts`, `MergeFileSplitRead` is built without `ts`; that can either fail with `sequence.field ... not found` or merge by file sequence instead of the configured user sequence. Please reuse the existing `_create_split_read` widening/project-back logic for `effective_read_type`, or factor it so the auth path cannot drift from the normal PK path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
