This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ff62c57b59 [python][ray] Abort worker writes on failure (#8124)
ff62c57b59 is described below

commit ff62c57b59d1ea41bd6c31abdaf3c254da745fbe
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 13:27:09 2026 +0800

    [python][ray] Abort worker writes on failure (#8124)
    
    Ray write tasks previously closed on the worker-side `TableWrite` when
    write, prepare, or close failed. That is unsafe because close can flush
    pending data, and `prepare_commit()` can materialize normal, blob, or
    vector files before a later failure prevents the driver commit.
    
    This PR makes worker-side Ray writes abort on failure and propagates
    abort through `TableWrite` and `FileStoreWrite`. It also keeps dedicated
    blob/vector metadata reachable from the parent writer so abort can
    delete files produced before the failed commit path.
---
 .../pypaimon/tests/data_evolution_formats_test.py  | 120 +++++++++++++++++++++
 paimon-python/pypaimon/tests/ray_sink_test.py      |  71 +++++++++++-
 paimon-python/pypaimon/write/file_store_write.py   |   9 ++
 paimon-python/pypaimon/write/ray_datasink.py       |  17 ++-
 paimon-python/pypaimon/write/table_write.py        |   3 +
 .../pypaimon/write/writer/data_vector_writer.py    |   5 +-
 paimon-python/pypaimon/write/writer/data_writer.py |  16 +--
 .../write/writer/dedicated_format_writer.py        |  17 +--
 8 files changed, 238 insertions(+), 20 deletions(-)

diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py 
b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
index f89f3290f1..b30560c400 100644
--- a/paimon-python/pypaimon/tests/data_evolution_formats_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
@@ -46,6 +46,10 @@ class DataEvolutionFormatsTest(unittest.TestCase):
     def tearDownClass(cls):
         shutil.rmtree(cls.tempdir, ignore_errors=True)
 
+    @staticmethod
+    def _file_path(file_meta):
+        return file_meta.external_path if file_meta.external_path else 
file_meta.file_path
+
     # ------------------------------------------------------------------
     # Parquet-format data evolution
     # ------------------------------------------------------------------
@@ -236,6 +240,41 @@ class DataEvolutionFormatsTest(unittest.TestCase):
         self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
         self.assertEqual(actual.column('payload').to_pylist(), blobs)
 
+    def test_blob_abort_deletes_uncommitted_files(self):
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_abort_cleanup', schema, 
False)
+        table = self.catalog.get_table('default.fmt_blob_abort_cleanup')
+
+        writer = table.new_batch_write_builder().new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'payload': [b'a', b'b', b'c'],
+        }, schema=pa_schema))
+        commit_messages = writer.prepare_commit()
+
+        all_files = [nf for msg in commit_messages for nf in msg.new_files]
+        parquet_files = [f for f in all_files if 
f.file_name.endswith('.parquet')]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        self.assertGreater(len(parquet_files), 0)
+        self.assertGreater(len(blob_files), 0)
+        for file_meta in all_files:
+            self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
+
+        writer.abort()
+
+        for file_meta in all_files:
+            self.assertFalse(
+                table.file_io.exists(self._file_path(file_meta)),
+                f"Expected abort to delete {file_meta.file_name}",
+            )
+
     def test_blob_column_subset_evolution(self):
         """Write normal+blob cols in one commit, overwrite normal col in 
another, merge-read."""
         pa_schema = pa.schema([
@@ -563,6 +602,87 @@ class DataEvolutionFormatsTest(unittest.TestCase):
     # Vector (vortex) file format for embedding columns
     # ------------------------------------------------------------------
 
+    def test_vector_abort_deletes_uncommitted_files(self):
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'vector.file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_vector_abort_cleanup', schema, 
False)
+        table = self.catalog.get_table('default.fmt_vector_abort_cleanup')
+
+        writer = table.new_batch_write_builder().new_write()
+        writer.write_arrow(pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.1, 0.2, 0.3,
+                          0.4, 0.5, 0.6,
+                          0.7, 0.8, 0.9], type=pa.float32()), 3),
+        }))
+        commit_messages = writer.prepare_commit()
+
+        all_files = [nf for msg in commit_messages for nf in msg.new_files]
+        normal_files = [f for f in all_files if not 
DataFileMeta.is_vector_file(f.file_name)]
+        vector_files = [f for f in all_files if 
DataFileMeta.is_vector_file(f.file_name)]
+        self.assertGreater(len(normal_files), 0)
+        self.assertGreater(len(vector_files), 0)
+        for file_meta in all_files:
+            self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
+
+        writer.abort()
+
+        for file_meta in all_files:
+            self.assertFalse(
+                table.file_io.exists(self._file_path(file_meta)),
+                f"Expected abort to delete {file_meta.file_name}",
+            )
+
+    def test_vector_close_failure_after_prepare_raises(self):
+        from unittest.mock import patch
+
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'vector.file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_vector_close_failure', schema, 
False)
+        table = self.catalog.get_table('default.fmt_vector_close_failure')
+
+        writer = table.new_batch_write_builder().new_write()
+        writer.write_arrow(pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.1, 0.2, 0.3,
+                          0.4, 0.5, 0.6,
+                          0.7, 0.8, 0.9], type=pa.float32()), 3),
+        }))
+        commit_messages = writer.prepare_commit()
+
+        all_files = [nf for msg in commit_messages for nf in msg.new_files]
+        for file_meta in all_files:
+            self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
+
+        data_writer = next(iter(writer.file_store_write.data_writers.values()))
+        with patch.object(
+                data_writer, '_close_current_writers',
+                side_effect=RuntimeError("Close error")):
+            with self.assertRaisesRegex(RuntimeError, "Close error"):
+                writer.close()
+
+        for file_meta in all_files:
+            self.assertFalse(
+                table.file_io.exists(self._file_path(file_meta)),
+                f"Expected abort to delete {file_meta.file_name}",
+            )
+
     @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
     @unittest.skipUnless(
         __import__('importlib').util.find_spec('vortex') is not None,
diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py 
b/paimon-python/pypaimon/tests/ray_sink_test.py
index a6d761df5a..afc86b571e 100644
--- a/paimon-python/pypaimon/tests/ray_sink_test.py
+++ b/paimon-python/pypaimon/tests/ray_sink_test.py
@@ -65,6 +65,16 @@ class RaySinkTest(unittest.TestCase):
         if os.path.exists(self.temp_dir):
             shutil.rmtree(self.temp_dir)
 
+    @staticmethod
+    def _data_files_under(table):
+        table_path = table.file_io.to_filesystem_path(table.table_path)
+        data_files = []
+        for root, _, files in os.walk(table_path):
+            for file_name in files:
+                if file_name.endswith(('.parquet', '.blob')) or '.vector.' in 
file_name:
+                    data_files.append(os.path.join(root, file_name))
+        return data_files
+
     def test_init_and_serialization(self):
         """Test initialization, serialization, and table name."""
         datasink = PaimonDatasink(self.table, overwrite=False)
@@ -272,7 +282,66 @@ class RaySinkTest(unittest.TestCase):
             })
             with self.assertRaises(Exception):
                 datasink.write([data_table], ctx)
-            mock_write.close.assert_called_once()
+            mock_write.abort.assert_called_once()
+            mock_write.close.assert_not_called()
+
+        with patch.object(self.table, 'new_batch_write_builder') as 
mock_builder:
+            mock_write_builder = Mock()
+            mock_write_builder.overwrite.return_value = mock_write_builder
+            mock_write = Mock()
+            mock_write.prepare_commit.return_value = [Mock(spec=CommitMessage)]
+            mock_write.close.side_effect = Exception("Close error")
+            mock_write_builder.new_write.return_value = mock_write
+            mock_builder.return_value = mock_write_builder
+
+            data_table = pa.table({
+                'id': [1],
+                'name': ['Alice'],
+                'value': [1.1]
+            })
+            with self.assertRaises(Exception):
+                datasink.write([data_table], ctx)
+            mock_write.prepare_commit.assert_called_once()
+            mock_write.abort.assert_called_once()
+
+    def 
test_write_does_not_return_prepared_messages_when_dedicated_close_aborts(self):
+        from pypaimon.write.writer.dedicated_format_writer import 
DedicatedFormatWriter
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        table_identifier = "test_db.test_blob_close_failure"
+        self.catalog.create_table(table_identifier, schema, False)
+        table = self.catalog.get_table(table_identifier)
+
+        datasink = PaimonDatasink(table, overwrite=False)
+        datasink.on_write_start()
+        ctx = Mock(spec=TaskContext)
+        data_table = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'payload': [b'a', b'b', b'c'],
+        }, schema=pa_schema)
+
+        original_close_current_writers = 
DedicatedFormatWriter._close_current_writers
+        close_current_calls = {'count': 0}
+
+        def fail_during_close(writer):
+            close_current_calls['count'] += 1
+            if close_current_calls['count'] == 1:
+                return original_close_current_writers(writer)
+            raise RuntimeError("Close error")
+
+        with patch.object(DedicatedFormatWriter, '_close_current_writers', 
fail_during_close):
+            with self.assertRaisesRegex(RuntimeError, "Close error"):
+                datasink.write([data_table], ctx)
+
+        self.assertEqual(close_current_calls['count'], 2)
+        self.assertEqual([], self._data_files_under(table))
 
     def test_on_write_complete(self):
         from ray.data.datasource.datasink import WriteResult
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index ee3b96d8d7..58ceb1c83c 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -251,6 +251,15 @@ class FileStoreWrite:
             writer.close()
         self.data_writers.clear()
 
+    def abort(self):
+        """Abort all data writers and clean up files produced by this write."""
+        for writer in self.data_writers.values():
+            try:
+                writer.abort()
+            except Exception as e:
+                logger.warning("Failed to abort data writer.", exc_info=e)
+        self.data_writers.clear()
+
     def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]:
         buckets = self.max_seq_numbers.get(partition)
         if buckets is None:
diff --git a/paimon-python/pypaimon/write/ray_datasink.py 
b/paimon-python/pypaimon/write/ray_datasink.py
index 6d48906f9f..18b7f024f0 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -136,11 +136,20 @@ class PaimonDatasink(_DatasinkBase):
 
             commit_messages = table_write.prepare_commit()
             commit_messages_list.extend(commit_messages)
-        finally:
-            if table_write is not None:
-                table_write.close()
 
-        return commit_messages_list
+            table_write.close()
+            table_write = None
+            return commit_messages_list
+        except Exception:
+            if table_write is not None:
+                try:
+                    table_write.abort()
+                except Exception as abort_error:
+                    logger.warning(
+                        f"Error aborting worker-side table_write: 
{abort_error}",
+                        exc_info=abort_error
+                    )
+            raise
 
     @staticmethod
     def _extract_write_returns(write_result: Any):
diff --git a/paimon-python/pypaimon/write/table_write.py 
b/paimon-python/pypaimon/write/table_write.py
index 411ddd9ceb..91eafa7536 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -135,6 +135,9 @@ class TableWrite:
     def close(self):
         self.file_store_write.close()
 
+    def abort(self):
+        self.file_store_write.abort()
+
     def _validate_pyarrow_schema(self, data_schema: pa.Schema):
         if data_schema == self.table_pyarrow_schema:
             return
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py 
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index e63a916ed4..f959a35b45 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -143,6 +143,7 @@ class DataVectorWriter(DataWriter):
         except Exception as e:
             logger.error("Exception occurs when closing writer. Cleaning up.", 
exc_info=e)
             self.abort()
+            raise
         finally:
             self.closed = True
             self.pending_normal_data = None
@@ -151,7 +152,7 @@ class DataVectorWriter(DataWriter):
         if self.vector_writer is not None:
             self.vector_writer.abort()
         self.pending_normal_data = None
-        self.committed_files.clear()
+        super().abort()
 
     def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, 
pa.RecordBatch]:
         normal_data = (
@@ -187,11 +188,11 @@ class DataVectorWriter(DataWriter):
 
         if self.vector_writer is not None:
             vector_metas = self.vector_writer.prepare_commit()
-            self.vector_writer.committed_files.clear()
             if vector_metas:
                 if normal_meta is not None:
                     self._validate_consistency(normal_meta, vector_metas)
                 self.committed_files.extend(vector_metas)
+            self.vector_writer.committed_files.clear()
 
         self.pending_normal_data = None
 
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index e237bf9f37..75a6359a09 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -138,8 +138,15 @@ class DataWriter(ABC):
         Abort all writers and clean up resources. This method should be called 
when an error occurs
         during writing. It deletes any files that were written and cleans up 
resources.
         """
-        # Delete any files that were written (data + changelog)
-        for file_meta in self.committed_files + self.committed_changelog_files:
+        self._delete_committed_files(self.committed_files + 
self.committed_changelog_files)
+
+        # Clean up resources
+        self.pending_data = None
+        self.committed_files.clear()
+        self.committed_changelog_files.clear()
+
+    def _delete_committed_files(self, file_metas: List[DataFileMeta]):
+        for file_meta in file_metas:
             try:
                 path_to_delete = file_meta.external_path if 
file_meta.external_path else file_meta.file_path
                 if path_to_delete:
@@ -151,11 +158,6 @@ class DataWriter(ABC):
                 path_to_delete = file_meta.external_path if 
file_meta.external_path else file_meta.file_path
                 logger.warning(f"Failed to delete file {path_to_delete} during 
abort: {e}")
 
-        # Clean up resources
-        self.pending_data = None
-        self.committed_files.clear()
-        self.committed_changelog_files.clear()
-
     @abstractmethod
     def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
         """Process incoming data (e.g., add system fields, sort). Must be 
implemented by subclasses."""
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py 
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index c223c1c62b..fb5d75b2a8 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -252,6 +252,7 @@ class DedicatedFormatWriter(DataWriter):
         except Exception as e:
             logger.error("Exception occurs when closing writer. Cleaning up.", 
exc_info=e)
             self.abort()
+            raise
         finally:
             self.closed = True
             self.pending_normal_data = None
@@ -264,7 +265,13 @@ class DedicatedFormatWriter(DataWriter):
             self.vector_writer.abort()
         if self._external_storage_writer:
             self._external_storage_writer.abort()
+        committed_non_blob_files = [
+            file_meta for file_meta in self.committed_files
+            if not DataFileMeta.is_blob_file(file_meta.file_name)
+        ]
+        self._delete_committed_files(committed_non_blob_files)
         self.pending_normal_data = None
+        self.pending_data = None
         self.committed_files.clear()
 
     def _split_data(self, data: pa.RecordBatch) -> Tuple[
@@ -370,6 +377,7 @@ class DedicatedFormatWriter(DataWriter):
         normal_meta = None
         if self.pending_normal_data is not None and 
self.pending_normal_data.num_rows > 0:
             normal_meta = 
self._write_normal_data_to_file(self.pending_normal_data)
+            self.committed_files.append(normal_meta)
 
         blob_metas = []
         for blob_column in self.blob_file_column_names:
@@ -377,18 +385,15 @@ class DedicatedFormatWriter(DataWriter):
             if normal_meta is not None:
                 self._validate_consistency(normal_meta, writer_metas, 
blob_column)
             blob_metas.extend(writer_metas)
+        self.committed_files.extend(blob_metas)
 
         vector_metas = []
         if self.vector_writer is not None:
             vector_metas = self.vector_writer.prepare_commit()
-            self.vector_writer.committed_files.clear()
             if vector_metas and normal_meta is not None:
                 self._validate_consistency(normal_meta, vector_metas, 'vector')
-
-        if normal_meta is not None:
-            self.committed_files.append(normal_meta)
-        self.committed_files.extend(blob_metas)
-        self.committed_files.extend(vector_metas)
+            self.committed_files.extend(vector_metas)
+            self.vector_writer.committed_files.clear()
 
         self.pending_normal_data = None
 

Reply via email to