JingsongLi commented on code in PR #7192:
URL: https://github.com/apache/paimon/pull/7192#discussion_r2758880693
##########
paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py:
##########
@@ -208,6 +208,69 @@ def test_py_write_read_pk_table(self, file_format):
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)
+ _BUCKET_TEST_KEYS = [
+ ('e2e_pk_001', 'e2e_suite_001', 1),
+ ('k', 'v', 0),
+ ('e2e_pk_002', 'e2e_suite_002', 2),
+ ]
+
+ @parameterized.expand([('parquet',), ('orc',), ('avro',)])
+ def test_py_write_read_pk_table_bucket_num_calculate(self, file_format):
Review Comment:
Can you merge this method into `test_py_write_read_pk_table`?
##########
paimon-python/pypaimon/write/row_key_extractor.py:
##########
@@ -15,18 +15,164 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-import hashlib
-import json
+import math
+import struct
from abc import ABC, abstractmethod
-from typing import List, Tuple
+from typing import Any, List, Tuple
import pyarrow as pa
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
+_MURMUR_C1 = 0xCC9E2D51
+_MURMUR_C2 = 0x1B873593
+_DEFAULT_SEED = 42
+_HEADER_SIZE_IN_BITS = 8
+
+
+def _round_number_of_bytes_to_nearest_word(num_bytes: int) -> int:
+ remainder = num_bytes & 0x07
+ return num_bytes if remainder == 0 else num_bytes + (8 - remainder)
+
+
+def _calculate_bit_set_width_in_bytes(arity: int) -> int:
+ return ((arity + 63 + _HEADER_SIZE_IN_BITS) // 64) * 8
+
+
+def _get_fixed_length_part_size(arity: int) -> int:
+ return _calculate_bit_set_width_in_bytes(arity) + 8 * arity
+
+
+def _mix_k1(k1: int) -> int:
+ k1 = (k1 * _MURMUR_C1) & 0xFFFFFFFF
+ k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF
+ k1 = (k1 * _MURMUR_C2) & 0xFFFFFFFF
+ return k1
+
+
+def _mix_h1(h1: int, k1: int) -> int:
+ h1 = (h1 ^ k1) & 0xFFFFFFFF
+ h1 = ((h1 << 13) | (h1 >> 19)) & 0xFFFFFFFF
+ h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF
+ return h1
+
+
+def _fmix(h1: int, length: int) -> int:
+ # Finalization mix - force all bits of a hash block to avalanche
+ h1 = (h1 ^ length) & 0xFFFFFFFF
+ h1 ^= h1 >> 16
+ h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF
+ h1 ^= h1 >> 13
+ h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF
+ h1 ^= h1 >> 16
+ return h1
+
+
+def _hash_bytes_by_words(data: bytes, seed: int = _DEFAULT_SEED) -> int:
+ n = len(data)
+ length_aligned = n - (n % 4)
+ h1 = seed
+ for i in range(0, length_aligned, 4):
+ k1 = struct.unpack_from("<I", data, i)[0]
+ k1 = _mix_k1(k1)
+ h1 = _mix_h1(h1, k1)
+ return _fmix(h1, n)
+
+
+def _bucket_from_hash(hash_unsigned: int, num_buckets: int) -> int:
+ if hash_unsigned >= 0x80000000:
+ hash_signed = hash_unsigned - 0x100000000
+ else:
+ hash_signed = hash_unsigned
+ rem = hash_signed - math.trunc(hash_signed / num_buckets) * num_buckets
+ return abs(rem)
+
+
+def _type_name_for_bucket(field_type: Any) -> str:
+ if hasattr(field_type, "type") and isinstance(getattr(field_type, "type"),
str):
+ return getattr(field_type, "type").upper()
+ s = str(field_type).upper()
+ if "BIGINT" in s or "LONG" in s:
+ return "BIGINT"
+ if "TINYINT" in s:
+ return "TINYINT"
+ if "SMALLINT" in s:
+ return "SMALLINT"
+ if "INT" in s or "INTEGER" in s:
+ return "INT"
+ if "STRING" in s or "VARCHAR" in s or "CHAR" in s:
+ return "STRING"
+ if "FLOAT" in s:
+ return "FLOAT"
+ if "DOUBLE" in s:
+ return "DOUBLE"
+ if "BOOLEAN" in s or "BOOL" in s:
+ return "BOOLEAN"
+ return s
+
+
+def _to_binary_row_bytes(type_names: List[str], values: Tuple[Any, ...]) ->
bytes:
Review Comment:
Use `GenericRowSerializer`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]