This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 017f4ccbc8 [python] Fix file name prefix in postpone mode. (#6668)
017f4ccbc8 is described below
commit 017f4ccbc812269ed4b532536c059d0088c13e13
Author: umi <[email protected]>
AuthorDate: Thu Nov 27 11:43:55 2025 +0800
[python] Fix file name prefix in postpone mode. (#6668)
---
paimon-python/dev/lint-python.sh | 8 +-
paimon-python/pypaimon/common/core_options.py | 5 +
.../pypaimon/tests/rest/rest_simple_test.py | 48 ------
.../pypaimon/tests/write/table_write_test.py | 177 ++++++++++++++++++++-
paimon-python/pypaimon/write/file_store_write.py | 19 ++-
paimon-python/pypaimon/write/table_write.py | 9 +-
paimon-python/pypaimon/write/write_builder.py | 4 +-
paimon-python/pypaimon/write/writer/blob_writer.py | 11 +-
.../pypaimon/write/writer/data_blob_writer.py | 11 +-
paimon-python/pypaimon/write/writer/data_writer.py | 6 +-
10 files changed, 225 insertions(+), 73 deletions(-)
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index fb57bfdd41..469ee56c9d 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -107,10 +107,14 @@ function collect_checks() {
function get_all_supported_checks() {
_OLD_IFS=$IFS
IFS=$'\n'
- SUPPORT_CHECKS=()
+ SUPPORT_CHECKS=("flake8_check" "pytest_check" "mixed_check") # control the
calling sequence
for fun in $(declare -F); do
if [[ `regexp_match "$fun" "_check$"` = true ]]; then
- SUPPORT_CHECKS+=("${fun:11}")
+ check_name="${fun:11}"
+ # Only add if not already in SUPPORT_CHECKS
+ if [[ ! `contains_element "${SUPPORT_CHECKS[*]}" "$check_name"` =
true ]]; then
+ SUPPORT_CHECKS+=("$check_name")
+ fi
fi
done
IFS=$_OLD_IFS
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 028f757b77..0686132979 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -50,6 +50,7 @@ class CoreOptions(str, Enum):
FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
TARGET_FILE_SIZE = "target-file-size"
BLOB_TARGET_FILE_SIZE = "blob.target-file-size"
+ DATA_FILE_PREFIX = "data-file.prefix"
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
@@ -64,6 +65,10 @@ class CoreOptions(str, Enum):
DATA_FILE_EXTERNAL_PATHS_STRATEGY = "data-file.external-paths.strategy"
DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS =
"data-file.external-paths.specific-fs"
+ @staticmethod
+ def data_file_prefix(options: dict) -> str:
+ return options.get(CoreOptions.DATA_FILE_PREFIX, "data-")
+
@staticmethod
def blob_as_descriptor(options: dict) -> bool:
return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR,
"false").lower() == 'true'
diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
index 19aec430fd..1b62d5a11f 100644
--- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -15,8 +15,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
-import glob
-import os
import pyarrow as pa
@@ -566,52 +564,6 @@ class RESTSimpleTest(RESTBaseTest):
expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema)
self.assertEqual(actual, expected)
- def test_postpone_write(self):
- schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
- options={'bucket': -2})
- self.rest_catalog.create_table('default.test_postpone', schema, False)
- table = self.rest_catalog.get_table('default.test_postpone')
-
- expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
-
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- table_write.write_arrow(expect)
- commit_messages = table_write.prepare_commit()
- table_commit.commit(commit_messages)
- table_write.close()
- table_commit.close()
-
- self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/snapshot/LATEST"))
- self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/snapshot/snapshot-1"))
- self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/manifest"))
- self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/manifest/*")), 3)
- self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
-
- def test_postpone_read_write(self):
- schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
- options={'bucket': -2})
- self.rest_catalog.create_table('default.test_postpone', schema, False)
- table = self.rest_catalog.get_table('default.test_postpone')
-
- expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
-
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- table_write.write_arrow(expect)
- commit_messages = table_write.prepare_commit()
- table_commit.commit(commit_messages)
- table_write.close()
- table_commit.close()
-
- read_builder = table.new_read_builder()
- table_read = read_builder.new_read()
- splits = read_builder.new_scan().plan().splits()
- actual = table_read.to_arrow(splits)
- self.assertTrue(not actual)
-
def test_create_drop_database_table(self):
# test create database
self.rest_catalog.create_database("db1", False)
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py
b/paimon-python/pypaimon/tests/write/table_write_test.py
index 21b76731ac..04c9610a4b 100644
--- a/paimon-python/pypaimon/tests/write/table_write_test.py
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
-
+import glob
import os
import shutil
@@ -153,3 +153,178 @@ class TableWriteTest(unittest.TestCase):
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('user_id')
self.assertEqual(self.expected, actual)
+
+ def test_postpone_read_write(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+ options={'bucket': -2})
+ self.catalog.create_table('default.test_postpone', schema, False)
+ table = self.catalog.get_table('default.test_postpone')
+ data = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ expect = pa.Table.from_pydict(data, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ self.assertTrue(os.path.exists(self.warehouse +
"/default.db/test_postpone/snapshot/LATEST"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/default.db/test_postpone/snapshot/snapshot-1"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/default.db/test_postpone/manifest"))
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default.db/test_postpone/manifest/*")), 3)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default.db/test_postpone/user_id=2/bucket-postpone/*.avro")),
+ 1)
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits)
+ self.assertTrue(not actual)
+
+ def test_data_file_prefix_postpone(self):
+ """Test that generated data file names follow the expected prefix
format."""
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+ options={'bucket': -2})
+ self.catalog.create_table('default.test_file_prefix_postpone', schema,
False)
+ table = self.catalog.get_table('default.test_file_prefix_postpone')
+
+ # Write some data to generate files
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = {
+ 'user_id': [1, 2],
+ 'item_id': [1001, 1002],
+ 'behavior': ['a', 'b'],
+ 'dt': ['p1', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ # Find generated data files
+ table_path = os.path.join(self.warehouse, 'default.db',
'test_file_prefix_postpone')
+ data_files = []
+ for root, dirs, files in os.walk(table_path):
+ for file in files:
+ if file.endswith('.parquet') or file.endswith('.avro') or
file.endswith('.orc'):
+ data_files.append(file)
+
+ # Verify at least one data file was created
+ self.assertGreater(len(data_files), 0, "No data files were generated")
+
+ # Verify file name format:
{table_prefix}-u-{commit_user}-s-{random_number}-w--{uuid}-0.{format}
+ # Expected pattern: data--u-{user}-s-{random}-w--{uuid}-0.{format}
+ expected_pattern = r'^data--u-.+-s-\d+-w-.+-0\.avro$'
+
+ for file_name in data_files:
+ self.assertRegex(file_name, expected_pattern,
+ f"File name '{file_name}' does not match expected
prefix format")
+
+ # Additional checks for specific components
+ parts = file_name.split('-')
+ self.assertEqual('data', parts[0], f"File prefix should start with
'data', got '{parts[0]}'")
+ self.assertEqual('u', parts[2], f"Second part should be 'u', got
'{parts[2]}'")
+ self.assertEqual('s', parts[8], f"Fourth part should be 's', got
'{parts[8]}'")
+ self.assertEqual('w', parts[10], f"Sixth part should be 'w', got
'{parts[10]}'")
+
+ def test_data_file_prefix_default(self):
+ """Test that generated data file names follow the expected prefix
format."""
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'])
+ self.catalog.create_table('default.test_file_prefix_default', schema,
False)
+ table = self.catalog.get_table('default.test_file_prefix_default')
+
+ # Write some data to generate files
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = {
+ 'user_id': [1, 2],
+ 'item_id': [1001, 1002],
+ 'behavior': ['a', 'b'],
+ 'dt': ['p1', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ # Find generated data files
+ table_path = os.path.join(self.warehouse, 'default.db',
'test_file_prefix_default')
+ data_files = []
+ for root, dirs, files in os.walk(table_path):
+ for file in files:
+ if file.endswith('.parquet') or file.endswith('.avro') or
file.endswith('.orc'):
+ data_files.append(file)
+
+ # Verify at least one data file was created
+ self.assertGreater(len(data_files), 0, "No data files were generated")
+
+ expected_pattern = r'^data-.+-0\.parquet$'
+
+ for file_name in data_files:
+ self.assertRegex(file_name, expected_pattern,
+ f"File name '{file_name}' does not match expected
prefix format")
+
+ # Additional checks for specific components
+ parts = file_name.split('-')
+ self.assertEqual('data', parts[0], f"File prefix should start with
'data', got '{parts[0]}'")
+
+ def test_data_file_prefix(self):
+ """Test that generated data file names follow the expected prefix
format."""
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'],
+ options={'data-file.prefix':
'test_prefix'})
+ self.catalog.create_table('default.test_file_prefix', schema, False)
+ table = self.catalog.get_table('default.test_file_prefix')
+
+ # Write some data to generate files
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = {
+ 'user_id': [1, 2],
+ 'item_id': [1001, 1002],
+ 'behavior': ['a', 'b'],
+ 'dt': ['p1', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ # Find generated data files
+ table_path = os.path.join(self.warehouse, 'default.db',
'test_file_prefix')
+ data_files = []
+ for root, dirs, files in os.walk(table_path):
+ for file in files:
+ if file.endswith('.parquet') or file.endswith('.avro') or
file.endswith('.orc'):
+ data_files.append(file)
+
+ # Verify at least one data file was created
+ self.assertGreater(len(data_files), 0, "No data files were generated")
+
+ expected_pattern = r'^test_prefix.+-0\.parquet$'
+
+ for file_name in data_files:
+ self.assertRegex(file_name, expected_pattern,
+ f"File name '{file_name}' does not match expected
prefix format")
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index c100b64966..119bef6115 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,10 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import random
from typing import Dict, List, Tuple
import pyarrow as pa
+from pypaimon.common.core_options import CoreOptions
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
from pypaimon.write.writer.data_blob_writer import DataBlobWriter
@@ -30,7 +32,7 @@ from pypaimon.table.bucket_mode import BucketMode
class FileStoreWrite:
"""Base class for file store write operations."""
- def __init__(self, table):
+ def __init__(self, table, commit_user):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -38,15 +40,21 @@ class FileStoreWrite:
self.max_seq_numbers: dict = {}
self.write_cols = None
self.commit_identifier = 0
+ self.options = dict(table.options)
+ if (CoreOptions.BUCKET in table.options and
+ self.table.bucket_mode() == BucketMode.POSTPONE_MODE):
+ self.options[CoreOptions.DATA_FILE_PREFIX] = \
+
(f"{CoreOptions.data_file_prefix(table.options)}-u-{commit_user}"
+ f"-s-{random.randint(0, 2 ** 31 - 2)}-w-")
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
key = (partition, bucket)
if key not in self.data_writers:
- self.data_writers[key] = self._create_data_writer(partition,
bucket)
+ self.data_writers[key] = self._create_data_writer(partition,
bucket, self.options)
writer = self.data_writers[key]
writer.write(data)
- def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
+ def _create_data_writer(self, partition: Tuple, bucket: int, options:
Dict[str, str]) -> DataWriter:
def max_seq_number():
return self._seq_number_stats(partition).get(bucket, 1)
@@ -57,13 +65,15 @@ class FileStoreWrite:
partition=partition,
bucket=bucket,
max_seq_number=0,
+ options=options
)
elif self.table.is_primary_key_table:
return KeyValueDataWriter(
table=self.table,
partition=partition,
bucket=bucket,
- max_seq_number=max_seq_number())
+ max_seq_number=max_seq_number(),
+ options=options)
else:
seq_number = 0 if self.table.bucket_mode() ==
BucketMode.BUCKET_UNAWARE else max_seq_number()
return AppendOnlyDataWriter(
@@ -71,6 +81,7 @@ class FileStoreWrite:
partition=partition,
bucket=bucket,
max_seq_number=seq_number,
+ options=options,
write_cols=self.write_cols
)
diff --git a/paimon-python/pypaimon/write/table_write.py
b/paimon-python/pypaimon/write/table_write.py
index 7f415a0ba7..0ac73356a3 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -27,13 +27,14 @@ from pypaimon.write.file_store_write import FileStoreWrite
class TableWrite:
- def __init__(self, table):
+ def __init__(self, table, commit_user):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.table_pyarrow_schema =
PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields)
- self.file_store_write = FileStoreWrite(self.table)
+ self.file_store_write = FileStoreWrite(self.table, commit_user)
self.row_key_extractor = self.table.create_row_key_extractor()
+ self.commit_user = commit_user
def write_arrow(self, table: pa.Table):
batches_iterator = table.to_batches()
@@ -79,8 +80,8 @@ class TableWrite:
class BatchTableWrite(TableWrite):
- def __init__(self, table):
- super().__init__(table)
+ def __init__(self, table, commit_user):
+ super().__init__(table, commit_user)
self.batch_committed = False
def prepare_commit(self) -> List[CommitMessage]:
diff --git a/paimon-python/pypaimon/write/write_builder.py
b/paimon-python/pypaimon/write/write_builder.py
index 8c9ed725f5..fdda0c54b5 100644
--- a/paimon-python/pypaimon/write/write_builder.py
+++ b/paimon-python/pypaimon/write/write_builder.py
@@ -53,7 +53,7 @@ class WriteBuilder(ABC):
class BatchWriteBuilder(WriteBuilder):
def new_write(self) -> BatchTableWrite:
- return BatchTableWrite(self.table)
+ return BatchTableWrite(self.table, self.commit_user)
def new_commit(self) -> BatchTableCommit:
commit = BatchTableCommit(self.table, self.commit_user,
self.static_partition)
@@ -62,7 +62,7 @@ class BatchWriteBuilder(WriteBuilder):
class StreamWriteBuilder(WriteBuilder):
def new_write(self) -> StreamTableWrite:
- return StreamTableWrite(self.table)
+ return StreamTableWrite(self.table, self.commit_user)
def new_commit(self) -> StreamTableCommit:
commit = StreamTableCommit(self.table, self.commit_user,
self.static_partition)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 92c7e6ea1d..c8d0d45076 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -19,7 +19,7 @@
import logging
import uuid
import pyarrow as pa
-from typing import Optional, Tuple
+from typing import Optional, Tuple, Dict
from pypaimon.common.core_options import CoreOptions
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
@@ -32,8 +32,10 @@ CHECK_ROLLING_RECORD_CNT = 1000
class BlobWriter(AppendOnlyDataWriter):
- def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, blob_column: str):
- super().__init__(table, partition, bucket, max_seq_number,
[blob_column])
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, blob_column: str,
+ options: Dict[str, str] = None):
+ super().__init__(table, partition, bucket, max_seq_number,
+ options, write_cols=[blob_column])
# Override file format to "blob"
self.file_format = CoreOptions.FILE_FORMAT_BLOB
@@ -95,7 +97,8 @@ class BlobWriter(AppendOnlyDataWriter):
self.sequence_generator.next()
def open_current_writer(self):
- file_name =
f"data-{self.file_uuid}-{self.file_count}.{self.file_format}"
+ file_name = (f"{CoreOptions.data_file_prefix(self.options)}"
+ f"{self.file_uuid}-{self.file_count}.{self.file_format}")
self.file_count += 1 # Increment counter for next file
file_path = self._generate_file_path(file_name)
self.current_file_path = file_path
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 26ba1bee7b..6b81525c7f 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -19,7 +19,7 @@
import logging
import uuid
from datetime import datetime
-from typing import List, Optional, Tuple
+from typing import List, Optional, Tuple, Dict
import pyarrow as pa
@@ -75,8 +75,8 @@ class DataBlobWriter(DataWriter):
# Constant for checking rolling condition periodically
CHECK_ROLLING_RECORD_CNT = 1000
- def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int):
- super().__init__(table, partition, bucket, max_seq_number)
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: Dict[str, str] = None):
+ super().__init__(table, partition, bucket, max_seq_number, options)
# Determine blob column from table schema
self.blob_column_name = self._get_blob_columns_from_schema()
@@ -100,7 +100,8 @@ class DataBlobWriter(DataWriter):
partition=self.partition,
bucket=self.bucket,
max_seq_number=max_seq_number,
- blob_column=self.blob_column_name
+ blob_column=self.blob_column_name,
+ options=options
)
logger.info(f"Initialized DataBlobWriter with blob column:
{self.blob_column_name}")
@@ -245,7 +246,7 @@ class DataBlobWriter(DataWriter):
if data.num_rows == 0:
return None
- file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+ file_name =
f"{CoreOptions.data_file_prefix(self.options)}{uuid.uuid4()}-0.{self.file_format}"
file_path = self._generate_file_path(file_name)
# Write file based on format
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 56487094ed..079b8d26d6 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -34,7 +34,7 @@ from pypaimon.table.row.generic_row import GenericRow
class DataWriter(ABC):
"""Base class for data writers that handle PyArrow tables directly."""
- def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int,
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: Dict[str, str] = None,
write_cols: Optional[List[str]] = None):
from pypaimon.table.file_store_table import FileStoreTable
@@ -46,7 +46,7 @@ class DataWriter(ABC):
self.trimmed_primary_keys_fields =
self.table.trimmed_primary_keys_fields
self.trimmed_primary_keys = self.table.trimmed_primary_keys
- options = self.table.options
+ self.options = options
self.target_file_size = CoreOptions.target_file_size(options,
self.table.is_primary_key_table)
# POSTPONE_BUCKET uses AVRO format, otherwise default to PARQUET
default_format = (
@@ -158,7 +158,7 @@ class DataWriter(ABC):
def _write_data_to_file(self, data: pa.Table):
if data.num_rows == 0:
return
- file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+ file_name =
f"{CoreOptions.data_file_prefix(self.options)}{uuid.uuid4()}-0.{self.file_format}"
file_path = self._generate_file_path(file_name)
is_external_path = self.external_path_provider is not None