This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ff62c57b59 [python][ray] Abort worker writes on failure (#8124)
ff62c57b59 is described below
commit ff62c57b59d1ea41bd6c31abdaf3c254da745fbe
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 13:27:09 2026 +0800
[python][ray] Abort worker writes on failure (#8124)
Ray write tasks previously closed on the worker-side `TableWrite` when
write, prepare, or close failed. That is unsafe because close can flush
pending data, and `prepare_commit()` can materialize normal, blob, or
vector files before a later failure prevents the driver commit.
This PR makes worker-side Ray writes abort on failure and propagates
abort through `TableWrite` and `FileStoreWrite`. It also keeps dedicated
blob/vector metadata reachable from the parent writer so abort can
delete files produced before the failed commit path.
---
.../pypaimon/tests/data_evolution_formats_test.py | 120 +++++++++++++++++++++
paimon-python/pypaimon/tests/ray_sink_test.py | 71 +++++++++++-
paimon-python/pypaimon/write/file_store_write.py | 9 ++
paimon-python/pypaimon/write/ray_datasink.py | 17 ++-
paimon-python/pypaimon/write/table_write.py | 3 +
.../pypaimon/write/writer/data_vector_writer.py | 5 +-
paimon-python/pypaimon/write/writer/data_writer.py | 16 +--
.../write/writer/dedicated_format_writer.py | 17 +--
8 files changed, 238 insertions(+), 20 deletions(-)
diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py
b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
index f89f3290f1..b30560c400 100644
--- a/paimon-python/pypaimon/tests/data_evolution_formats_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
@@ -46,6 +46,10 @@ class DataEvolutionFormatsTest(unittest.TestCase):
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
+ @staticmethod
+ def _file_path(file_meta):
+ return file_meta.external_path if file_meta.external_path else
file_meta.file_path
+
# ------------------------------------------------------------------
# Parquet-format data evolution
# ------------------------------------------------------------------
@@ -236,6 +240,41 @@ class DataEvolutionFormatsTest(unittest.TestCase):
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('payload').to_pylist(), blobs)
+ def test_blob_abort_deletes_uncommitted_files(self):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('payload', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_abort_cleanup', schema,
False)
+ table = self.catalog.get_table('default.fmt_blob_abort_cleanup')
+
+ writer = table.new_batch_write_builder().new_write()
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'payload': [b'a', b'b', b'c'],
+ }, schema=pa_schema))
+ commit_messages = writer.prepare_commit()
+
+ all_files = [nf for msg in commit_messages for nf in msg.new_files]
+ parquet_files = [f for f in all_files if
f.file_name.endswith('.parquet')]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertGreater(len(parquet_files), 0)
+ self.assertGreater(len(blob_files), 0)
+ for file_meta in all_files:
+ self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
+
+ writer.abort()
+
+ for file_meta in all_files:
+ self.assertFalse(
+ table.file_io.exists(self._file_path(file_meta)),
+ f"Expected abort to delete {file_meta.file_name}",
+ )
+
def test_blob_column_subset_evolution(self):
"""Write normal+blob cols in one commit, overwrite normal col in
another, merge-read."""
pa_schema = pa.schema([
@@ -563,6 +602,87 @@ class DataEvolutionFormatsTest(unittest.TestCase):
# Vector (vortex) file format for embedding columns
# ------------------------------------------------------------------
+ def test_vector_abort_deletes_uncommitted_files(self):
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'vector.file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_vector_abort_cleanup', schema,
False)
+ table = self.catalog.get_table('default.fmt_vector_abort_cleanup')
+
+ writer = table.new_batch_write_builder().new_write()
+ writer.write_arrow(pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.1, 0.2, 0.3,
+ 0.4, 0.5, 0.6,
+ 0.7, 0.8, 0.9], type=pa.float32()), 3),
+ }))
+ commit_messages = writer.prepare_commit()
+
+ all_files = [nf for msg in commit_messages for nf in msg.new_files]
+ normal_files = [f for f in all_files if not
DataFileMeta.is_vector_file(f.file_name)]
+ vector_files = [f for f in all_files if
DataFileMeta.is_vector_file(f.file_name)]
+ self.assertGreater(len(normal_files), 0)
+ self.assertGreater(len(vector_files), 0)
+ for file_meta in all_files:
+ self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
+
+ writer.abort()
+
+ for file_meta in all_files:
+ self.assertFalse(
+ table.file_io.exists(self._file_path(file_meta)),
+ f"Expected abort to delete {file_meta.file_name}",
+ )
+
+ def test_vector_close_failure_after_prepare_raises(self):
+ from unittest.mock import patch
+
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'vector.file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_vector_close_failure', schema,
False)
+ table = self.catalog.get_table('default.fmt_vector_close_failure')
+
+ writer = table.new_batch_write_builder().new_write()
+ writer.write_arrow(pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.1, 0.2, 0.3,
+ 0.4, 0.5, 0.6,
+ 0.7, 0.8, 0.9], type=pa.float32()), 3),
+ }))
+ commit_messages = writer.prepare_commit()
+
+ all_files = [nf for msg in commit_messages for nf in msg.new_files]
+ for file_meta in all_files:
+ self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
+
+ data_writer = next(iter(writer.file_store_write.data_writers.values()))
+ with patch.object(
+ data_writer, '_close_current_writers',
+ side_effect=RuntimeError("Close error")):
+ with self.assertRaisesRegex(RuntimeError, "Close error"):
+ writer.close()
+
+ for file_meta in all_files:
+ self.assertFalse(
+ table.file_io.exists(self._file_path(file_meta)),
+ f"Expected abort to delete {file_meta.file_name}",
+ )
+
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py
b/paimon-python/pypaimon/tests/ray_sink_test.py
index a6d761df5a..afc86b571e 100644
--- a/paimon-python/pypaimon/tests/ray_sink_test.py
+++ b/paimon-python/pypaimon/tests/ray_sink_test.py
@@ -65,6 +65,16 @@ class RaySinkTest(unittest.TestCase):
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
+ @staticmethod
+ def _data_files_under(table):
+ table_path = table.file_io.to_filesystem_path(table.table_path)
+ data_files = []
+ for root, _, files in os.walk(table_path):
+ for file_name in files:
+ if file_name.endswith(('.parquet', '.blob')) or '.vector.' in
file_name:
+ data_files.append(os.path.join(root, file_name))
+ return data_files
+
def test_init_and_serialization(self):
"""Test initialization, serialization, and table name."""
datasink = PaimonDatasink(self.table, overwrite=False)
@@ -272,7 +282,66 @@ class RaySinkTest(unittest.TestCase):
})
with self.assertRaises(Exception):
datasink.write([data_table], ctx)
- mock_write.close.assert_called_once()
+ mock_write.abort.assert_called_once()
+ mock_write.close.assert_not_called()
+
+ with patch.object(self.table, 'new_batch_write_builder') as
mock_builder:
+ mock_write_builder = Mock()
+ mock_write_builder.overwrite.return_value = mock_write_builder
+ mock_write = Mock()
+ mock_write.prepare_commit.return_value = [Mock(spec=CommitMessage)]
+ mock_write.close.side_effect = Exception("Close error")
+ mock_write_builder.new_write.return_value = mock_write
+ mock_builder.return_value = mock_write_builder
+
+ data_table = pa.table({
+ 'id': [1],
+ 'name': ['Alice'],
+ 'value': [1.1]
+ })
+ with self.assertRaises(Exception):
+ datasink.write([data_table], ctx)
+ mock_write.prepare_commit.assert_called_once()
+ mock_write.abort.assert_called_once()
+
+ def
test_write_does_not_return_prepared_messages_when_dedicated_close_aborts(self):
+ from pypaimon.write.writer.dedicated_format_writer import
DedicatedFormatWriter
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('payload', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ table_identifier = "test_db.test_blob_close_failure"
+ self.catalog.create_table(table_identifier, schema, False)
+ table = self.catalog.get_table(table_identifier)
+
+ datasink = PaimonDatasink(table, overwrite=False)
+ datasink.on_write_start()
+ ctx = Mock(spec=TaskContext)
+ data_table = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'payload': [b'a', b'b', b'c'],
+ }, schema=pa_schema)
+
+ original_close_current_writers =
DedicatedFormatWriter._close_current_writers
+ close_current_calls = {'count': 0}
+
+ def fail_during_close(writer):
+ close_current_calls['count'] += 1
+ if close_current_calls['count'] == 1:
+ return original_close_current_writers(writer)
+ raise RuntimeError("Close error")
+
+ with patch.object(DedicatedFormatWriter, '_close_current_writers',
fail_during_close):
+ with self.assertRaisesRegex(RuntimeError, "Close error"):
+ datasink.write([data_table], ctx)
+
+ self.assertEqual(close_current_calls['count'], 2)
+ self.assertEqual([], self._data_files_under(table))
def test_on_write_complete(self):
from ray.data.datasource.datasink import WriteResult
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index ee3b96d8d7..58ceb1c83c 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -251,6 +251,15 @@ class FileStoreWrite:
writer.close()
self.data_writers.clear()
+ def abort(self):
+ """Abort all data writers and clean up files produced by this write."""
+ for writer in self.data_writers.values():
+ try:
+ writer.abort()
+ except Exception as e:
+ logger.warning("Failed to abort data writer.", exc_info=e)
+ self.data_writers.clear()
+
def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]:
buckets = self.max_seq_numbers.get(partition)
if buckets is None:
diff --git a/paimon-python/pypaimon/write/ray_datasink.py
b/paimon-python/pypaimon/write/ray_datasink.py
index 6d48906f9f..18b7f024f0 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -136,11 +136,20 @@ class PaimonDatasink(_DatasinkBase):
commit_messages = table_write.prepare_commit()
commit_messages_list.extend(commit_messages)
- finally:
- if table_write is not None:
- table_write.close()
- return commit_messages_list
+ table_write.close()
+ table_write = None
+ return commit_messages_list
+ except Exception:
+ if table_write is not None:
+ try:
+ table_write.abort()
+ except Exception as abort_error:
+ logger.warning(
+ f"Error aborting worker-side table_write:
{abort_error}",
+ exc_info=abort_error
+ )
+ raise
@staticmethod
def _extract_write_returns(write_result: Any):
diff --git a/paimon-python/pypaimon/write/table_write.py
b/paimon-python/pypaimon/write/table_write.py
index 411ddd9ceb..91eafa7536 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -135,6 +135,9 @@ class TableWrite:
def close(self):
self.file_store_write.close()
+ def abort(self):
+ self.file_store_write.abort()
+
def _validate_pyarrow_schema(self, data_schema: pa.Schema):
if data_schema == self.table_pyarrow_schema:
return
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index e63a916ed4..f959a35b45 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -143,6 +143,7 @@ class DataVectorWriter(DataWriter):
except Exception as e:
logger.error("Exception occurs when closing writer. Cleaning up.",
exc_info=e)
self.abort()
+ raise
finally:
self.closed = True
self.pending_normal_data = None
@@ -151,7 +152,7 @@ class DataVectorWriter(DataWriter):
if self.vector_writer is not None:
self.vector_writer.abort()
self.pending_normal_data = None
- self.committed_files.clear()
+ super().abort()
def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch,
pa.RecordBatch]:
normal_data = (
@@ -187,11 +188,11 @@ class DataVectorWriter(DataWriter):
if self.vector_writer is not None:
vector_metas = self.vector_writer.prepare_commit()
- self.vector_writer.committed_files.clear()
if vector_metas:
if normal_meta is not None:
self._validate_consistency(normal_meta, vector_metas)
self.committed_files.extend(vector_metas)
+ self.vector_writer.committed_files.clear()
self.pending_normal_data = None
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index e237bf9f37..75a6359a09 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -138,8 +138,15 @@ class DataWriter(ABC):
Abort all writers and clean up resources. This method should be called
when an error occurs
during writing. It deletes any files that were written and cleans up
resources.
"""
- # Delete any files that were written (data + changelog)
- for file_meta in self.committed_files + self.committed_changelog_files:
+ self._delete_committed_files(self.committed_files +
self.committed_changelog_files)
+
+ # Clean up resources
+ self.pending_data = None
+ self.committed_files.clear()
+ self.committed_changelog_files.clear()
+
+ def _delete_committed_files(self, file_metas: List[DataFileMeta]):
+ for file_meta in file_metas:
try:
path_to_delete = file_meta.external_path if
file_meta.external_path else file_meta.file_path
if path_to_delete:
@@ -151,11 +158,6 @@ class DataWriter(ABC):
path_to_delete = file_meta.external_path if
file_meta.external_path else file_meta.file_path
logger.warning(f"Failed to delete file {path_to_delete} during
abort: {e}")
- # Clean up resources
- self.pending_data = None
- self.committed_files.clear()
- self.committed_changelog_files.clear()
-
@abstractmethod
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
"""Process incoming data (e.g., add system fields, sort). Must be
implemented by subclasses."""
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index c223c1c62b..fb5d75b2a8 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -252,6 +252,7 @@ class DedicatedFormatWriter(DataWriter):
except Exception as e:
logger.error("Exception occurs when closing writer. Cleaning up.",
exc_info=e)
self.abort()
+ raise
finally:
self.closed = True
self.pending_normal_data = None
@@ -264,7 +265,13 @@ class DedicatedFormatWriter(DataWriter):
self.vector_writer.abort()
if self._external_storage_writer:
self._external_storage_writer.abort()
+ committed_non_blob_files = [
+ file_meta for file_meta in self.committed_files
+ if not DataFileMeta.is_blob_file(file_meta.file_name)
+ ]
+ self._delete_committed_files(committed_non_blob_files)
self.pending_normal_data = None
+ self.pending_data = None
self.committed_files.clear()
def _split_data(self, data: pa.RecordBatch) -> Tuple[
@@ -370,6 +377,7 @@ class DedicatedFormatWriter(DataWriter):
normal_meta = None
if self.pending_normal_data is not None and
self.pending_normal_data.num_rows > 0:
normal_meta =
self._write_normal_data_to_file(self.pending_normal_data)
+ self.committed_files.append(normal_meta)
blob_metas = []
for blob_column in self.blob_file_column_names:
@@ -377,18 +385,15 @@ class DedicatedFormatWriter(DataWriter):
if normal_meta is not None:
self._validate_consistency(normal_meta, writer_metas,
blob_column)
blob_metas.extend(writer_metas)
+ self.committed_files.extend(blob_metas)
vector_metas = []
if self.vector_writer is not None:
vector_metas = self.vector_writer.prepare_commit()
- self.vector_writer.committed_files.clear()
if vector_metas and normal_meta is not None:
self._validate_consistency(normal_meta, vector_metas, 'vector')
-
- if normal_meta is not None:
- self.committed_files.append(normal_meta)
- self.committed_files.extend(blob_metas)
- self.committed_files.extend(vector_metas)
+ self.committed_files.extend(vector_metas)
+ self.vector_writer.committed_files.clear()
self.pending_normal_data = None