This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 93b8bcd308 [python] Refactor Test to standardize naming and structure (#6252) 93b8bcd308 is described below commit 93b8bcd308b41e71768f8359034b2828f4e213f6 Author: umi <55790489+discivig...@users.noreply.github.com> AuthorDate: Mon Sep 15 12:01:57 2025 +0800 [python] Refactor Test to standardize naming and structure (#6252) --- ...e_store_commit.py => file_store_commit_test.py} | 0 .../pypaimon/tests/filesystem_catalog_test.py | 2 +- .../pypaimon/tests/predicate_push_down_test.py | 151 --------------- paimon-python/pypaimon/tests/predicates_test.py | 100 ++++++++++ paimon-python/pypaimon/tests/pvfs_test.py | 4 +- .../pypaimon/tests/py36/ao_predicate_test.py | 30 +-- ...ad_write_test.py => rest_ao_read_write_test.py} | 23 +-- .../pypaimon/tests/reader_append_only_test.py | 16 +- .../{reader_basic_test.py => reader_base_test.py} | 208 ++++++++++++++++----- .../pypaimon/tests/reader_primary_key_test.py | 12 +- paimon-python/pypaimon/tests/rest/__init__.py | 17 ++ .../pypaimon/tests/{ => rest}/api_test.py | 4 +- .../rest_base_test.py} | 4 +- .../rest_catalog_commit_snapshot_test.py} | 0 .../rest_read_write_test.py} | 36 ++-- .../pypaimon/tests/{ => rest}/rest_server.py | 0 .../rest_simple_test.py} | 4 +- paimon-python/pypaimon/tests/schema_test.py | 79 -------- paimon-python/pypaimon/tests/writer_test.py | 94 ---------- 19 files changed, 343 insertions(+), 441 deletions(-) diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py b/paimon-python/pypaimon/tests/file_store_commit_test.py similarity index 100% rename from paimon-python/pypaimon/tests/test_file_store_commit.py rename to paimon-python/pypaimon/tests/file_store_commit_test.py diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index 0e9900efb4..d6b9433cba 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -29,7 +29,7 @@ from pypaimon import Schema from pypaimon.table.file_store_table import FileStoreTable -class FileSystemCatalogTestCase(unittest.TestCase): +class FileSystemCatalogTest(unittest.TestCase): def setUp(self): self.temp_dir = tempfile.mkdtemp(prefix="unittest_") diff --git a/paimon-python/pypaimon/tests/predicate_push_down_test.py b/paimon-python/pypaimon/tests/predicate_push_down_test.py deleted file mode 100644 index e809295d57..0000000000 --- a/paimon-python/pypaimon/tests/predicate_push_down_test.py +++ /dev/null @@ -1,151 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -import os -import shutil -import tempfile -import unittest - -import pyarrow as pa - -from pypaimon import CatalogFactory -from pypaimon.common.predicate_builder import PredicateBuilder -from pypaimon import Schema - - -class PredicatePushDownTest(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.tempdir = tempfile.mkdtemp() - cls.warehouse = os.path.join(cls.tempdir, 'warehouse') - cls.catalog = CatalogFactory.create({ - 'warehouse': cls.warehouse - }) - cls.catalog.create_database('default', False) - - cls.pa_schema = pa.schema([ - pa.field('key1', pa.int32(), nullable=False), - pa.field('key2', pa.string(), nullable=False), - ('behavior', pa.string()), - pa.field('dt1', pa.string(), nullable=False), - pa.field('dt2', pa.int32(), nullable=False) - ]) - cls.expected = pa.Table.from_pydict({ - 'key1': [1, 2, 3, 4, 5, 7, 8], - 'key2': ['h', 'g', 'f', 'e', 'd', 'b', 'a'], - 'behavior': ['a', 'b-new', 'c', None, 'e', 'g', 'h'], - 'dt1': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'], - 'dt2': [2, 2, 1, 2, 2, 1, 2], - }, schema=cls.pa_schema) - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tempdir, ignore_errors=True) - - def testPkReaderWithFilter(self): - schema = Schema.from_pyarrow_schema(self.pa_schema, - partition_keys=['dt1', 'dt2'], - primary_keys=['key1', 'key2'], - options={'bucket': '1'}) - self.catalog.create_table('default.test_pk_filter', schema, False) - table = self.catalog.get_table('default.test_pk_filter') - - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - data1 = { - 'key1': [1, 2, 3, 4], - 'key2': ['h', 'g', 'f', 'e'], - 'behavior': ['a', 'b', 'c', None], - 'dt1': ['p1', 'p1', 'p2', 'p1'], - 'dt2': [2, 2, 1, 2], - } - pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) - table_write.write_arrow(pa_table) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() - - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - data1 = { - 'key1': [5, 2, 7, 8], - 'key2': ['d', 'g', 'b', 'a'], - 'behavior': ['e', 'b-new', 'g', 'h'], - 'dt1': ['p2', 'p1', 'p1', 'p2'], - 'dt2': [2, 2, 1, 2] - } - pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) - table_write.write_arrow(pa_table) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() - - # test filter by partition - predicate_builder: PredicateBuilder = table.new_read_builder().new_predicate_builder() - p1 = predicate_builder.startswith('dt1', "p1") - p2 = predicate_builder.is_in('dt1', ["p2"]) - p3 = predicate_builder.or_predicates([p1, p2]) - p4 = predicate_builder.equal('dt2', 2) - g1 = predicate_builder.and_predicates([p3, p4]) - # (dt1 startswith 'p1' or dt1 is_in ["p2"]) and dt2 == 2 - read_builder = table.new_read_builder().with_filter(g1) - splits = read_builder.new_scan().plan().splits() - self.assertEqual(len(splits), 2) - self.assertEqual(splits[0].partition.to_dict()["dt2"], 2) - self.assertEqual(splits[1].partition.to_dict()["dt2"], 2) - - # test filter by stats - predicate_builder: PredicateBuilder = table.new_read_builder().new_predicate_builder() - p1 = predicate_builder.equal('key1', 7) - p2 = predicate_builder.is_in('key2', ["e", "f"]) - p3 = predicate_builder.or_predicates([p1, p2]) - p4 = predicate_builder.greater_than('key1', 3) - g1 = predicate_builder.and_predicates([p3, p4]) - # (key1 == 7 or key2 is_in ["e", "f"]) and key1 > 3 - read_builder = table.new_read_builder().with_filter(g1) - splits = read_builder.new_scan().plan().splits() - # initial splits meta: - # p1, 2 -> 2g, 2g; 1e, 4h - # p2, 1 -> 3f, 3f - # p2, 2 -> 5a, 8d - # p1, 1 -> 7b, 7b - self.assertEqual(len(splits), 3) - # expect to filter out `p1, 2 -> 2g, 2g` and `p2, 1 -> 3f, 3f` - count = 0 - for split in splits: - if split.partition.values == ["p1", 2]: - count += 1 - self.assertEqual(len(split.files), 1) - min_values = split.files[0].value_stats.min_values.to_dict() - max_values = split.files[0].value_stats.max_values.to_dict() - self.assertTrue(min_values["key1"] == 1 and min_values["key2"] == "e" - and max_values["key1"] == 4 and max_values["key2"] == "h") - elif split.partition.values == ["p2", 2]: - count += 1 - min_values = split.files[0].value_stats.min_values.to_dict() - max_values = split.files[0].value_stats.max_values.to_dict() - self.assertTrue(min_values["key1"] == 5 and min_values["key2"] == "a" - and max_values["key1"] == 8 and max_values["key2"] == "d") - elif split.partition.values == ["p1", 1]: - count += 1 - min_values = split.files[0].value_stats.min_values.to_dict() - max_values = split.files[0].value_stats.max_values.to_dict() - self.assertTrue(min_values["key1"] == max_values["key1"] == 7 - and max_values["key2"] == max_values["key2"] == "b") - self.assertEqual(count, 3) diff --git a/paimon-python/pypaimon/tests/predicates_test.py b/paimon-python/pypaimon/tests/predicates_test.py index ca3caa5276..0ddebc3451 100644 --- a/paimon-python/pypaimon/tests/predicates_test.py +++ b/paimon-python/pypaimon/tests/predicates_test.py @@ -372,3 +372,103 @@ class PredicateTest(unittest.TestCase): predicate = predicate_builder.or_predicates([predicate1, predicate2]) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0, 3, 4]]) + + def testPkReaderWithFilter(self): + pa_schema = pa.schema([ + pa.field('key1', pa.int32(), nullable=False), + pa.field('key2', pa.string(), nullable=False), + ('behavior', pa.string()), + pa.field('dt1', pa.string(), nullable=False), + pa.field('dt2', pa.int32(), nullable=False) + ]) + schema = Schema.from_pyarrow_schema(pa_schema, + partition_keys=['dt1', 'dt2'], + primary_keys=['key1', 'key2'], + options={'bucket': '1'}) + self.catalog.create_table('default.test_pk_filter', schema, False) + table = self.catalog.get_table('default.test_pk_filter') + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'key1': [1, 2, 3, 4], + 'key2': ['h', 'g', 'f', 'e'], + 'behavior': ['a', 'b', 'c', None], + 'dt1': ['p1', 'p1', 'p2', 'p1'], + 'dt2': [2, 2, 1, 2], + } + pa_table = pa.Table.from_pydict(data1, schema=pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'key1': [5, 2, 7, 8], + 'key2': ['d', 'g', 'b', 'a'], + 'behavior': ['e', 'b-new', 'g', 'h'], + 'dt1': ['p2', 'p1', 'p1', 'p2'], + 'dt2': [2, 2, 1, 2] + } + pa_table = pa.Table.from_pydict(data1, schema=pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # test filter by partition + predicate_builder: PredicateBuilder = table.new_read_builder().new_predicate_builder() + p1 = predicate_builder.startswith('dt1', "p1") + p2 = predicate_builder.is_in('dt1', ["p2"]) + p3 = predicate_builder.or_predicates([p1, p2]) + p4 = predicate_builder.equal('dt2', 2) + g1 = predicate_builder.and_predicates([p3, p4]) + # (dt1 startswith 'p1' or dt1 is_in ["p2"]) and dt2 == 2 + read_builder = table.new_read_builder().with_filter(g1) + splits = read_builder.new_scan().plan().splits() + self.assertEqual(len(splits), 2) + self.assertEqual(splits[0].partition.to_dict()["dt2"], 2) + self.assertEqual(splits[1].partition.to_dict()["dt2"], 2) + + # test filter by stats + predicate_builder: PredicateBuilder = table.new_read_builder().new_predicate_builder() + p1 = predicate_builder.equal('key1', 7) + p2 = predicate_builder.is_in('key2', ["e", "f"]) + p3 = predicate_builder.or_predicates([p1, p2]) + p4 = predicate_builder.greater_than('key1', 3) + g1 = predicate_builder.and_predicates([p3, p4]) + # (key1 == 7 or key2 is_in ["e", "f"]) and key1 > 3 + read_builder = table.new_read_builder().with_filter(g1) + splits = read_builder.new_scan().plan().splits() + # initial splits meta: + # p1, 2 -> 2g, 2g; 1e, 4h + # p2, 1 -> 3f, 3f + # p2, 2 -> 5a, 8d + # p1, 1 -> 7b, 7b + self.assertEqual(len(splits), 3) + # expect to filter out `p1, 2 -> 2g, 2g` and `p2, 1 -> 3f, 3f` + count = 0 + for split in splits: + if split.partition.values == ["p1", 2]: + count += 1 + self.assertEqual(len(split.files), 1) + min_values = split.files[0].value_stats.min_values.to_dict() + max_values = split.files[0].value_stats.max_values.to_dict() + self.assertTrue(min_values["key1"] == 1 and min_values["key2"] == "e" + and max_values["key1"] == 4 and max_values["key2"] == "h") + elif split.partition.values == ["p2", 2]: + count += 1 + min_values = split.files[0].value_stats.min_values.to_dict() + max_values = split.files[0].value_stats.max_values.to_dict() + self.assertTrue(min_values["key1"] == 5 and min_values["key2"] == "a" + and max_values["key1"] == 8 and max_values["key2"] == "d") + elif split.partition.values == ["p1", 1]: + count += 1 + min_values = split.files[0].value_stats.min_values.to_dict() + max_values = split.files[0].value_stats.max_values.to_dict() + self.assertTrue(min_values["key1"] == max_values["key1"] == 7 + and max_values["key2"] == max_values["key2"] == "b") + self.assertEqual(count, 3) diff --git a/paimon-python/pypaimon/tests/pvfs_test.py b/paimon-python/pypaimon/tests/pvfs_test.py index 4aa17706ee..89c96a6228 100644 --- a/paimon-python/pypaimon/tests/pvfs_test.py +++ b/paimon-python/pypaimon/tests/pvfs_test.py @@ -29,10 +29,10 @@ from pypaimon.api.auth import BearTokenAuthProvider from pypaimon.catalog.rest.table_metadata import TableMetadata from pypaimon.schema.data_types import AtomicType, DataField from pypaimon.schema.table_schema import TableSchema -from pypaimon.tests.api_test import RESTCatalogServer +from pypaimon.tests.rest.api_test import RESTCatalogServer -class PVFSTestCase(unittest.TestCase): +class PVFSTest(unittest.TestCase): def setUp(self): self.temp_dir = tempfile.mkdtemp(prefix="unittest_") diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py index 5e7435a360..92a3e9601a 100644 --- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py @@ -58,14 +58,14 @@ class PredicatePy36Test(unittest.TestCase): cls.df = df - def testWrongFieldName(self): + def test_wrong_field_name(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() with self.assertRaises(ValueError) as e: predicate_builder.equal('f2', 'a') self.assertEqual(str(e.exception), "The field f2 is not in field list ['f0', 'f1'].") - def testAppendWithDuplicate(self): + def test_append_with_duplicate(self): pa_schema = pa.schema([ ('f0', pa.int64()), ('f1', pa.string()), @@ -98,7 +98,7 @@ class PredicatePy36Test(unittest.TestCase): actual_df = read.to_pandas(scan.plan().splits()) self.assertEqual(len(actual_df), 0) - def testAllFieldTypesWithEqual(self): + def test_all_field_types_with_equal(self): pa_schema = pa.schema([ # int ('_tinyint', pa.int8()), @@ -169,67 +169,67 @@ class PredicatePy36Test(unittest.TestCase): predicate = predicate_builder.equal('_boolean', True) _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) - def testNotEqualAppend(self): + def test_not_equal_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.not_equal('f0', 1) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) - def testLessThanAppend(self): + def test_less_than_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.less_than('f0', 3) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) - def testLessOrEqualAppend(self): + def test_less_or_equal_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.less_or_equal('f0', 3) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) - def testGreaterThanAppend(self): + def test_greater_than_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.greater_than('f0', 3) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) - def testGreaterOrEqualAppend(self): + def test_greater_or_equal_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.greater_or_equal('f0', 3) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) - def testIsNullAppend(self): + def test_is_null_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.is_null('f1') _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) - def testIsNotNullAppend(self): + def test_is_not_null_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.is_not_null('f1') _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) - def testIsInAppend(self): + def test_is_in_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.is_in('f0', [1, 2]) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) - def testIsNotInAppend(self): + def test_is_not_in_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.is_not_in('f0', [1, 2]) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) - def testBetweenAppend(self): + def test_between_append(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate = predicate_builder.between('f0', 1, 3) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) - def testAndPredicates(self): + def test_and_predicates(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate1 = predicate_builder.greater_than('f0', 1) @@ -237,7 +237,7 @@ class PredicatePy36Test(unittest.TestCase): predicate = predicate_builder.and_predicates([predicate1, predicate2]) _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:2]) - def testOrPredicates(self): + def test_or_predicates(self): table = self.catalog.get_table('default.test_append') predicate_builder = table.new_read_builder().new_predicate_builder() predicate1 = predicate_builder.greater_than('f0', 3) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py similarity index 98% rename from paimon-python/pypaimon/tests/py36/ao_read_write_test.py rename to paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index fcd05ee6cf..20e6a2c2d8 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -38,11 +38,12 @@ from pypaimon import Schema from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer from pypaimon.table.row.row_kind import RowKind from pypaimon.tests.py36.pyarrow_compat import table_sort_by -from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest +from pypaimon.tests.rest.rest_base_test import RESTBaseTest + from pypaimon.write.file_store_commit import FileStoreCommit -class RESTTableReadWritePy36Test(RESTCatalogBaseTest): +class RESTReadWritePy36Test(RESTBaseTest): def test_overwrite(self): simple_pa_schema = pa.schema([ @@ -304,7 +305,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): self.assertEqual(south_stat.file_count, -1) self.assertEqual(south_stat.file_size_in_bytes, -750) - def testParquetAppendOnlyReader(self): + def test_parquet_append_only_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) table = self.rest_catalog.get_table('default.test_append_only_parquet') @@ -314,7 +315,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): actual = table_sort_by(self._read_test_table(read_builder), 'user_id') self.assertEqual(actual, self.expected) - def testOrcAppendOnlyReader(self): + def test_orc_append_only_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'orc'}) self.rest_catalog.create_table('default.test_append_only_orc', schema, False) table = self.rest_catalog.get_table('default.test_append_only_orc') @@ -324,7 +325,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): actual = table_sort_by(self._read_test_table(read_builder), 'user_id') self.assertEqual(actual, self.expected) - def testAvroAppendOnlyReader(self): + def test_avro_append_only_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) self.rest_catalog.create_table('default.test_append_only_avro', schema, False) table = self.rest_catalog.get_table('default.test_append_only_avro') @@ -444,7 +445,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): result = table_read.to_pandas(table_scan.plan().splits()) self.assertEqual(result.to_dict(), test_df.to_dict()) - def testAppendOnlyReaderWithFilter(self): + def test_append_only_reader_with_filter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_filter', schema, False) table = self.rest_catalog.get_table('default.test_append_only_filter') @@ -493,7 +494,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): ]) self.assertEqual(table_sort_by(actual, 'user_id'), expected) - def testAppendOnlyReaderWithProjection(self): + def test_append_only_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_projection', schema, False) table = self.rest_catalog.get_table('default.test_append_only_projection') @@ -504,7 +505,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): expected = self.expected.select(['dt', 'user_id']) self.assertEqual(actual, expected) - def testAvroAppendOnlyReaderWithProjection(self): + def test_avro_append_only_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) self.rest_catalog.create_table('default.test_avro_append_only_projection', schema, False) table = self.rest_catalog.get_table('default.test_avro_append_only_projection') @@ -515,7 +516,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): expected = self.expected.select(['dt', 'user_id']) self.assertEqual(actual, expected) - def testAppendOnlyReaderWithLimit(self): + def test_append_only_reader_with_limit(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_limit', schema, False) table = self.rest_catalog.get_table('default.test_append_only_limit') @@ -527,7 +528,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): # might be split of "dt=1" or split of "dt=2" self.assertEqual(actual.num_rows, 4) - def testWriteWrongSchema(self): + def test_write_wrong_schema(self): self.rest_catalog.create_table('default.test_wrong_schema', Schema.from_pyarrow_schema(self.pa_schema), False) @@ -551,7 +552,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): table_write.write_arrow_batch(record_batch) self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) - def testWriteWideTableLargeData(self): + def test_write_wide_table_large_data(self): logging.basicConfig(level=logging.INFO) catalog = CatalogFactory.create(self.options) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 795365e48e..db0cbcccd1 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -51,7 +51,7 @@ class AoReaderTest(unittest.TestCase): 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2'], }, schema=cls.pa_schema) - def testParquetAppendOnlyReader(self): + def test_parquet_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_parquet', schema, False) table = self.catalog.get_table('default.test_append_only_parquet') @@ -61,7 +61,7 @@ class AoReaderTest(unittest.TestCase): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testOrcAppendOnlyReader(self): + def test_orc_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'orc'}) self.catalog.create_table('default.test_append_only_orc', schema, False) table = self.catalog.get_table('default.test_append_only_orc') @@ -71,7 +71,7 @@ class AoReaderTest(unittest.TestCase): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testAvroAppendOnlyReader(self): + def test_avro_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) self.catalog.create_table('default.test_append_only_avro', schema, False) table = self.catalog.get_table('default.test_append_only_avro') @@ -115,7 +115,7 @@ class AoReaderTest(unittest.TestCase): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def test_over1000cols_read(self): + def test_over_1000_cols_read(self): num_rows = 1 num_cols = 10 table_name = "default.testBug" @@ -189,7 +189,7 @@ class AoReaderTest(unittest.TestCase): result = table_read.to_pandas(table_scan.plan().splits()) self.assertEqual(result.to_dict(), test_df.to_dict()) - def testAppendOnlyReaderWithFilter(self): + def test_ao_reader_with_filter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_filter', schema, False) table = self.catalog.get_table('default.test_append_only_filter') @@ -243,7 +243,7 @@ class AoReaderTest(unittest.TestCase): ]) self.assertEqual(actual.sort_by('user_id'), expected) - def testAppendOnlyReaderWithProjection(self): + def test_ao_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_projection', schema, False) table = self.catalog.get_table('default.test_append_only_projection') @@ -254,7 +254,7 @@ class AoReaderTest(unittest.TestCase): expected = self.expected.select(['dt', 'user_id']) self.assertEqual(actual, expected) - def testAvroAppendOnlyReaderWithProjection(self): + def test_avro_ao_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) self.catalog.create_table('default.test_avro_append_only_projection', schema, False) table = self.catalog.get_table('default.test_avro_append_only_projection') @@ -265,7 +265,7 @@ class AoReaderTest(unittest.TestCase): expected = self.expected.select(['dt', 'user_id']) self.assertEqual(actual, expected) - def testAppendOnlyReaderWithLimit(self): + def test_ao_reader_with_limit(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_limit', schema, False) table = self.catalog.get_table('default.test_append_only_limit') diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_base_test.py similarity index 79% rename from paimon-python/pypaimon/tests/reader_basic_test.py rename to paimon-python/pypaimon/tests/reader_base_test.py index e66f7cb94a..ccf06d5597 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -17,6 +17,7 @@ ################################################################################ import os +import glob import shutil import tempfile import unittest @@ -29,8 +30,9 @@ import pyarrow as pa from pypaimon.table.row.generic_row import GenericRow -from pypaimon.schema.data_types import DataField, AtomicType - +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, + MapType, PyarrowFieldParser) +from pypaimon.schema.table_schema import TableSchema from pypaimon import CatalogFactory from pypaimon import Schema from pypaimon.manifest.manifest_file_manager import ManifestFileManager @@ -224,7 +226,52 @@ class ReaderBasicTest(unittest.TestCase): self.assertEqual(min_value_stats, expected_min_values) self.assertEqual(max_value_stats, expected_max_values) - def test_mixed_add_and_delete_entries_same_partition(self): + def test_write_wrong_schema(self): + self.catalog.create_table('default.test_wrong_schema', + Schema.from_pyarrow_schema(self.pa_schema), + False) + table = self.catalog.get_table('default.test_wrong_schema') + + data = { + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + } + df = pd.DataFrame(data) + schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()) + ]) + record_batch = pa.RecordBatch.from_pandas(df, schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + + with self.assertRaises(ValueError) as e: + table_write.write_arrow_batch(record_batch) + self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) + + def test_reader_iterator(self): + read_builder = self.table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + iterator = table_read.to_iterator(splits) + result = [] + value = next(iterator, None) + while value is not None: + result.append(value.get_field(1)) + value = next(iterator, None) + self.assertEqual(result, [1001, 1002, 1003, 1004, 1005]) + + def test_reader_duckDB(self): + read_builder = self.table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') + actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf() + expect = pd.DataFrame(self.raw_data) + pd.testing.assert_frame_equal(actual.reset_index(drop=True), expect.reset_index(drop=True)) + + def test_mixed_add_and_delete_entries_compute_stats(self): """Test record_count calculation with mixed ADD/DELETE entries in same partition.""" pa_schema = pa.schema([ ('region', pa.string()), @@ -279,7 +326,7 @@ class ReaderBasicTest(unittest.TestCase): self.assertEqual(stat.file_count, 0) self.assertEqual(stat.file_size_in_bytes, 1248) - def test_multiple_partitions_with_different_operations(self): + def test_delete_entries_compute_stats(self): """Test record_count calculation across multiple partitions.""" pa_schema = pa.schema([ ('region', pa.string()), @@ -343,52 +390,7 @@ class ReaderBasicTest(unittest.TestCase): self.assertEqual(south_stat.file_count, -1) self.assertEqual(south_stat.file_size_in_bytes, -750) - def testWriteWrongSchema(self): - self.catalog.create_table('default.test_wrong_schema', - Schema.from_pyarrow_schema(self.pa_schema), - False) - table = self.catalog.get_table('default.test_wrong_schema') - - data = { - 'f0': [1, 2, 3], - 'f1': ['a', 'b', 'c'], - } - df = pd.DataFrame(data) - schema = pa.schema([ - ('f0', pa.int64()), - ('f1', pa.string()) - ]) - record_batch = pa.RecordBatch.from_pandas(df, schema) - - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - - with self.assertRaises(ValueError) as e: - table_write.write_arrow_batch(record_batch) - self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) - - def testReaderIterator(self): - read_builder = self.table.new_read_builder() - table_read = read_builder.new_read() - splits = read_builder.new_scan().plan().splits() - iterator = table_read.to_iterator(splits) - result = [] - value = next(iterator, None) - while value is not None: - result.append(value.get_field(1)) - value = next(iterator, None) - self.assertEqual(result, [1001, 1002, 1003, 1004, 1005]) - - def testReaderDuckDB(self): - read_builder = self.table.new_read_builder() - table_read = read_builder.new_read() - splits = read_builder.new_scan().plan().splits() - duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') - actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf() - expect = pd.DataFrame(self.raw_data) - pd.testing.assert_frame_equal(actual.reset_index(drop=True), expect.reset_index(drop=True)) - - def test_value_stats_cols_logic(self): + def test_value_stats_cols_param(self): """Test _VALUE_STATS_COLS logic in ManifestFileManager.""" # Create a catalog and table catalog = CatalogFactory.create({ @@ -437,6 +439,112 @@ class ReaderBasicTest(unittest.TestCase): test_name="specific_case" ) + def test_types(self): + data_fields = [ + DataField(0, "f0", AtomicType('TINYINT'), 'desc'), + DataField(1, "f1", AtomicType('SMALLINT'), 'desc'), + DataField(2, "f2", AtomicType('INT'), 'desc'), + DataField(3, "f3", AtomicType('BIGINT'), 'desc'), + DataField(4, "f4", AtomicType('FLOAT'), 'desc'), + DataField(5, "f5", AtomicType('DOUBLE'), 'desc'), + DataField(6, "f6", AtomicType('BOOLEAN'), 'desc'), + DataField(7, "f7", AtomicType('STRING'), 'desc'), + DataField(8, "f8", AtomicType('BINARY(12)'), 'desc'), + DataField(9, "f9", AtomicType('DECIMAL(10, 6)'), 'desc'), + DataField(10, "f10", AtomicType('BYTES'), 'desc'), + DataField(11, "f11", AtomicType('DATE'), 'desc'), + DataField(12, "f12", AtomicType('TIME(0)'), 'desc'), + DataField(13, "f13", AtomicType('TIME(3)'), 'desc'), + DataField(14, "f14", AtomicType('TIME(6)'), 'desc'), + DataField(15, "f15", AtomicType('TIME(9)'), 'desc'), + DataField(16, "f16", AtomicType('TIMESTAMP(0)'), 'desc'), + DataField(17, "f17", AtomicType('TIMESTAMP(3)'), 'desc'), + DataField(18, "f18", AtomicType('TIMESTAMP(6)'), 'desc'), + DataField(19, "f19", AtomicType('TIMESTAMP(9)'), 'desc'), + DataField(20, "arr", ArrayType(True, AtomicType('INT')), 'desc arr1'), + DataField(21, "map1", + MapType(False, AtomicType('INT', False), + MapType(False, AtomicType('INT', False), AtomicType('INT', False))), + 'desc map1'), + ] + table_schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), data_fields, + max(field.id for field in data_fields), + [], [], {}, "") + pa_fields = [] + for field in table_schema.fields: + pa_field = PyarrowFieldParser.from_paimon_field(field) + pa_fields.append(pa_field) + schema = Schema.from_pyarrow_schema( + pa_schema=pa.schema(pa_fields), + partition_keys=table_schema.partition_keys, + primary_keys=table_schema.primary_keys, + options=table_schema.options, + comment=table_schema.comment + ) + table_schema2 = TableSchema.from_schema(len(data_fields), schema) + l1 = [] + for field in table_schema.fields: + l1.append(field.to_dict()) + l2 = [] + for field in table_schema2.fields: + l2.append(field.to_dict()) + self.assertEqual(l1, l2) + + def test_write(self): + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()) + ]) + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_write_db", False) + catalog.create_table("test_write_db.test_table", Schema.from_pyarrow_schema(pa_schema), False) + table = catalog.get_table("test_write_db.test_table") + + data = { + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + 'f2': ['X', 'Y', 'Z'] + } + expect = pa.Table.from_pydict(data, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(expect) + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + table_write.close() + table_commit.close() + + self.assertTrue(os.path.exists(self.warehouse + "/test_write_db.db/test_table/snapshot/LATEST")) + self.assertTrue(os.path.exists(self.warehouse + "/test_write_db.db/test_table/snapshot/snapshot-1")) + self.assertTrue(os.path.exists(self.warehouse + "/test_write_db.db/test_table/manifest")) + self.assertTrue(os.path.exists(self.warehouse + "/test_write_db.db/test_table/bucket-0")) + self.assertEqual(len(glob.glob(self.warehouse + "/test_write_db.db/test_table/manifest/*")), 3) + self.assertEqual(len(glob.glob(self.warehouse + "/test_write_db.db/test_table/bucket-0/*.parquet")), 1) + + with open(self.warehouse + '/test_write_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file: + content = ''.join(file.readlines()) + self.assertTrue(content.__contains__('\"totalRecordCount\": 3')) + self.assertTrue(content.__contains__('\"deltaRecordCount\": 3')) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(expect) + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + table_write.close() + table_commit.close() + + with open(self.warehouse + '/test_write_db.db/test_table/snapshot/snapshot-2', 'r', encoding='utf-8') as file: + content = ''.join(file.readlines()) + self.assertTrue(content.__contains__('\"totalRecordCount\": 6')) + self.assertTrue(content.__contains__('\"deltaRecordCount\": 3')) + def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): """Helper method to test a specific _VALUE_STATS_COLS case.""" diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index d1a565d4cc..b992595fc9 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -54,7 +54,7 @@ class PkReaderTest(unittest.TestCase): def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) - def testPkParquetReader(self): + def test_pk_parquet_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -67,7 +67,7 @@ class PkReaderTest(unittest.TestCase): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testPkOrcReader(self): + def test_pk_orc_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -88,7 +88,7 @@ class PkReaderTest(unittest.TestCase): col_b = self.expected.column(i) self.assertEqual(col_a, col_b) - def testPkAvroReader(self): + def test_pk_avro_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -148,7 +148,7 @@ class PkReaderTest(unittest.TestCase): }, schema=self.pa_schema) self.assertEqual(actual, expected) - def testPkReaderWithFilter(self): + def test_pk_reader_with_filter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -166,11 +166,11 @@ class PkReaderTest(unittest.TestCase): actual = self._read_test_table(read_builder).sort_by('user_id') expected = pa.concat_tables([ self.expected.slice(1, 1), # 2/b - self.expected.slice(5, 1) # 7/g + self.expected.slice(5, 1) # 7/g ]) self.assertEqual(actual, expected) - def testPkReaderWithProjection(self): + def test_pk_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], diff --git a/paimon-python/pypaimon/tests/rest/__init__.py b/paimon-python/pypaimon/tests/rest/__init__.py new file mode 100644 index 0000000000..53ed4d36c2 --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/__init__.py @@ -0,0 +1,17 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/paimon-python/pypaimon/tests/api_test.py b/paimon-python/pypaimon/tests/rest/api_test.py similarity index 98% rename from paimon-python/pypaimon/tests/api_test.py rename to paimon-python/pypaimon/tests/rest/api_test.py index c63b912635..374765cea7 100644 --- a/paimon-python/pypaimon/tests/api_test.py +++ b/paimon-python/pypaimon/tests/rest/api_test.py @@ -31,10 +31,10 @@ from pypaimon.schema.data_types import (ArrayType, AtomicInteger, AtomicType, DataField, DataTypeParser, MapType, RowType) from pypaimon.schema.table_schema import TableSchema -from pypaimon.tests.rest_server import RESTCatalogServer +from pypaimon.tests.rest.rest_server import RESTCatalogServer -class ApiTestCase(unittest.TestCase): +class ApiTest(unittest.TestCase): def test_parse_data(self): simple_type_test_cases = [ diff --git a/paimon-python/pypaimon/tests/rest_catalog_base_test.py b/paimon-python/pypaimon/tests/rest/rest_base_test.py similarity index 98% rename from paimon-python/pypaimon/tests/rest_catalog_base_test.py rename to paimon-python/pypaimon/tests/rest/rest_base_test.py index 0dc3a129b7..3a83ccb285 100644 --- a/paimon-python/pypaimon/tests/rest_catalog_base_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py @@ -37,10 +37,10 @@ from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, MapType) from pypaimon import Schema from pypaimon.schema.table_schema import TableSchema -from pypaimon.tests.rest_server import RESTCatalogServer +from pypaimon.tests.rest.rest_server import RESTCatalogServer -class RESTCatalogBaseTest(unittest.TestCase): +class RESTBaseTest(unittest.TestCase): def setUp(self): self.temp_dir = tempfile.mkdtemp(prefix="unittest_") self.warehouse = os.path.join(self.temp_dir, 'warehouse') diff --git a/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py similarity index 100% rename from paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py rename to paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py similarity index 96% rename from paimon-python/pypaimon/tests/rest_table_read_write_test.py rename to paimon-python/pypaimon/tests/rest/rest_read_write_test.py index b070d39bbf..dc6c47e778 100644 --- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py @@ -27,10 +27,10 @@ from pypaimon import CatalogFactory from pypaimon.catalog.rest.rest_catalog import RESTCatalog from pypaimon.common.identifier import Identifier from pypaimon import Schema -from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest +from pypaimon.tests.rest.rest_base_test import RESTBaseTest -class RESTTableReadWriteTest(RESTCatalogBaseTest): +class RESTTableReadWriteTest(RESTBaseTest): def test_overwrite(self): simple_pa_schema = pa.schema([ @@ -113,7 +113,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) - def testParquetAppendOnlyReader(self): + def test_parquet_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) table = self.rest_catalog.get_table('default.test_append_only_parquet') @@ -123,7 +123,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testOrcAppendOnlyReader(self): + def test_orc_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'orc'}) self.rest_catalog.create_table('default.test_append_only_orc', schema, False) table = self.rest_catalog.get_table('default.test_append_only_orc') @@ -133,7 +133,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testAvroAppendOnlyReader(self): + def test_avro_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) self.rest_catalog.create_table('default.test_append_only_avro', schema, False) table = self.rest_catalog.get_table('default.test_append_only_avro') @@ -143,7 +143,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testAppendOnlyReaderWithFilter(self): + def test_ao_reader_with_filter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_filter', schema, False) table = self.rest_catalog.get_table('default.test_append_only_filter') @@ -197,7 +197,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): ]) self.assertEqual(actual.sort_by('user_id'), expected) - def testAppendOnlyReaderWithProjection(self): + def test_ao_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_projection', schema, False) table = self.rest_catalog.get_table('default.test_append_only_projection') @@ -208,7 +208,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): expected = self.expected.select(['dt', 'user_id']) self.assertEqual(actual, expected) - def testAvroAppendOnlyReaderWithProjection(self): + def test_avro_ao_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'avro'}) self.rest_catalog.create_table('default.test_avro_append_only_projection', schema, False) table = self.rest_catalog.get_table('default.test_avro_append_only_projection') @@ -219,7 +219,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): expected = self.expected.select(['dt', 'user_id']) self.assertEqual(actual, expected) - def testAppendOnlyReaderWithLimit(self): + def test_ao_reader_with_limit(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_limit', schema, False) table = self.rest_catalog.get_table('default.test_append_only_limit') @@ -231,7 +231,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): # might be split of "dt=1" or split of "dt=2" self.assertEqual(actual.num_rows, 4) - def testPkParquetReader(self): + def test_pk_parquet_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -244,7 +244,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testPkOrcReader(self): + def test_pk_orc_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -265,7 +265,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): col_b = self.expected.column(i) self.assertEqual(col_a, col_b) - def testPkAvroReader(self): + def test_pk_avro_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -281,7 +281,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) - def testPkReaderWithFilter(self): + def test_pk_reader_with_filter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -303,7 +303,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): ]) self.assertEqual(actual, expected) - def testPkReaderWithProjection(self): + def test_pk_reader_with_projection(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], @@ -317,7 +317,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): expected = self.expected.select(['dt', 'user_id', 'behavior']) self.assertEqual(actual, expected) - def testWriteWrongSchema(self): + def test_write_wrong_schema(self): self.rest_catalog.create_table('default.test_wrong_schema', Schema.from_pyarrow_schema(self.pa_schema), False) @@ -341,7 +341,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): table_write.write_arrow_batch(record_batch) self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) - def testReaderIterator(self): + def test_reader_iterator(self): read_builder = self.table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() @@ -353,7 +353,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): value = next(iterator, None) self.assertEqual(result, [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008]) - def testReaderDuckDB(self): + def test_reader_duckdb(self): read_builder = self.table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() @@ -362,7 +362,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): expect = pd.DataFrame(self.raw_data) pd.testing.assert_frame_equal(actual.reset_index(drop=True), expect.reset_index(drop=True)) - def testWriteWideTableLargeData(self): + def test_write_wide_table_large_data(self): logging.basicConfig(level=logging.INFO) catalog = CatalogFactory.create(self.options) diff --git a/paimon-python/pypaimon/tests/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py similarity index 100% rename from paimon-python/pypaimon/tests/rest_server.py rename to paimon-python/pypaimon/tests/rest/rest_server.py diff --git a/paimon-python/pypaimon/tests/rest_table_test.py b/paimon-python/pypaimon/tests/rest/rest_simple_test.py similarity index 98% rename from paimon-python/pypaimon/tests/rest_table_test.py rename to paimon-python/pypaimon/tests/rest/rest_simple_test.py index d86046c80c..95a20345b0 100644 --- a/paimon-python/pypaimon/tests/rest_table_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py @@ -21,13 +21,13 @@ import os import pyarrow as pa from pypaimon import Schema -from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest +from pypaimon.tests.rest.rest_base_test import RESTBaseTest from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor, FixedBucketRowKeyExtractor, UnawareBucketRowKeyExtractor) -class RESTTableTest(RESTCatalogBaseTest): +class RESTSimpleTest(RESTBaseTest): def setUp(self): super().setUp() self.pa_schema = pa.schema([ diff --git a/paimon-python/pypaimon/tests/schema_test.py b/paimon-python/pypaimon/tests/schema_test.py deleted file mode 100644 index 671f837117..0000000000 --- a/paimon-python/pypaimon/tests/schema_test.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import unittest - -import pyarrow - -from pypaimon import Schema -from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, - MapType, PyarrowFieldParser) -from pypaimon.schema.table_schema import TableSchema - - -class SchemaTestCase(unittest.TestCase): - def test_types(self): - data_fields = [ - DataField(0, "f0", AtomicType('TINYINT'), 'desc'), - DataField(1, "f1", AtomicType('SMALLINT'), 'desc'), - DataField(2, "f2", AtomicType('INT'), 'desc'), - DataField(3, "f3", AtomicType('BIGINT'), 'desc'), - DataField(4, "f4", AtomicType('FLOAT'), 'desc'), - DataField(5, "f5", AtomicType('DOUBLE'), 'desc'), - DataField(6, "f6", AtomicType('BOOLEAN'), 'desc'), - DataField(7, "f7", AtomicType('STRING'), 'desc'), - DataField(8, "f8", AtomicType('BINARY(12)'), 'desc'), - DataField(9, "f9", AtomicType('DECIMAL(10, 6)'), 'desc'), - DataField(10, "f10", AtomicType('BYTES'), 'desc'), - DataField(11, "f11", AtomicType('DATE'), 'desc'), - DataField(12, "f12", AtomicType('TIME(0)'), 'desc'), - DataField(13, "f13", AtomicType('TIME(3)'), 'desc'), - DataField(14, "f14", AtomicType('TIME(6)'), 'desc'), - DataField(15, "f15", AtomicType('TIME(9)'), 'desc'), - DataField(16, "f16", AtomicType('TIMESTAMP(0)'), 'desc'), - DataField(17, "f17", AtomicType('TIMESTAMP(3)'), 'desc'), - DataField(18, "f18", AtomicType('TIMESTAMP(6)'), 'desc'), - DataField(19, "f19", AtomicType('TIMESTAMP(9)'), 'desc'), - DataField(20, "arr", ArrayType(True, AtomicType('INT')), 'desc arr1'), - DataField(21, "map1", - MapType(False, AtomicType('INT', False), - MapType(False, AtomicType('INT', False), AtomicType('INT', False))), - 'desc map1'), - ] - table_schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), data_fields, - max(field.id for field in data_fields), - [], [], {}, "") - pa_fields = [] - for field in table_schema.fields: - pa_field = PyarrowFieldParser.from_paimon_field(field) - pa_fields.append(pa_field) - schema = Schema.from_pyarrow_schema( - pa_schema=pyarrow.schema(pa_fields), - partition_keys=table_schema.partition_keys, - primary_keys=table_schema.primary_keys, - options=table_schema.options, - comment=table_schema.comment - ) - table_schema2 = TableSchema.from_schema(len(data_fields), schema) - l1 = [] - for field in table_schema.fields: - l1.append(field.to_dict()) - l2 = [] - for field in table_schema2.fields: - l2.append(field.to_dict()) - self.assertEqual(l1, l2) diff --git a/paimon-python/pypaimon/tests/writer_test.py b/paimon-python/pypaimon/tests/writer_test.py deleted file mode 100644 index 34f7c53e70..0000000000 --- a/paimon-python/pypaimon/tests/writer_test.py +++ /dev/null @@ -1,94 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import glob -import os -import shutil -import tempfile -import unittest - -import pyarrow - -from pypaimon import CatalogFactory -from pypaimon import Schema - - -class WriterTest(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.temp_dir = tempfile.mkdtemp(prefix="unittest_") - cls.warehouse = os.path.join(cls.temp_dir, 'test_dir') - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.temp_dir, ignore_errors=True) - - def test_writer(self): - pa_schema = pyarrow.schema([ - ('f0', pyarrow.int32()), - ('f1', pyarrow.string()), - ('f2', pyarrow.string()) - ]) - catalog = CatalogFactory.create({ - "warehouse": self.warehouse - }) - catalog.create_database("test_db", False) - catalog.create_table("test_db.test_table", Schema.from_pyarrow_schema(pa_schema), False) - table = catalog.get_table("test_db.test_table") - - data = { - 'f0': [1, 2, 3], - 'f1': ['a', 'b', 'c'], - 'f2': ['X', 'Y', 'Z'] - } - expect = pyarrow.Table.from_pydict(data, schema=pa_schema) - - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - table_write.write_arrow(expect) - commit_messages = table_write.prepare_commit() - table_commit.commit(commit_messages) - table_write.close() - table_commit.close() - - self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/snapshot/LATEST")) - self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/snapshot/snapshot-1")) - self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/manifest")) - self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/bucket-0")) - self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/manifest/*")), 3) - self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/bucket-0/*.parquet")), 1) - - with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file: - content = ''.join(file.readlines()) - self.assertTrue(content.__contains__('\"totalRecordCount\": 3')) - self.assertTrue(content.__contains__('\"deltaRecordCount\": 3')) - - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - table_write.write_arrow(expect) - commit_messages = table_write.prepare_commit() - table_commit.commit(commit_messages) - table_write.close() - table_commit.close() - - with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-2', 'r', encoding='utf-8') as file: - content = ''.join(file.readlines()) - self.assertTrue(content.__contains__('\"totalRecordCount\": 6')) - self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))