This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5db03a49c54 [fix](external-write) Delete the actual data file on write
failure to avoid orphan files (#64678)
5db03a49c54 is described below
commit 5db03a49c54ba29707208e95da0b34f07627ca9e
Author: daidai <[email protected]>
AuthorDate: Mon Jun 29 17:02:13 2026 +0800
[fix](external-write) Delete the actual data file on write failure to avoid
orphan files (#64678)
When a partition writer fails to close (e.g. hdfs sync error), it cleans
up the partial file using `_file_name`, but the file is actually created
with `_get_target_file_name()` ("{_file_name}-{_file_name_index}{ext}").
The mismatched path makes the cleanup a no-op, leaving an orphan data
file on the storage.
Use `_get_target_file_name()` for the deletion so it matches the created
path, for both the Iceberg and Hive partition writers.
---
.../writer/iceberg/viceberg_partition_writer.cpp | 3 +-
be/src/exec/sink/writer/vhive_partition_writer.cpp | 78 ++++++++---
be/src/exec/sink/writer/vhive_partition_writer.h | 2 +
.../doris/datasource/hive/HMSTransaction.java | 30 +++--
.../datasource/hive/HMSTransactionPathTest.java | 145 ++++++++++++++++++++-
5 files changed, 226 insertions(+), 32 deletions(-)
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 434488266bb..5f28c41c24d 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -124,7 +124,8 @@ Status VIcebergPartitionWriter::close(const Status& status)
{
}
bool status_ok = result_status.ok() && status.ok();
if (!status_ok && _fs != nullptr) {
- auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
+ // delete the actual created file, otherwise an orphan file is left
behind
+ auto path = fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name());
Status st = _fs->delete_file(path);
if (!st.ok()) {
LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}",
path, st.to_string());
diff --git a/be/src/exec/sink/writer/vhive_partition_writer.cpp
b/be/src/exec/sink/writer/vhive_partition_writer.cpp
index 4bfc728c8c9..5e2582ceb5f 100644
--- a/be/src/exec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/exec/sink/writer/vhive_partition_writer.cpp
@@ -131,11 +131,16 @@ Status VHivePartitionWriter::close(const Status& status) {
}
}
bool status_ok = result_status.ok() && status.ok();
- if (!status_ok && _fs != nullptr) {
- auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
- Status st = _fs->delete_file(path);
- if (!st.ok()) {
- LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}",
path, st.to_string());
+ if (!status_ok) {
+ _add_s3_mpu_pending_upload_for_rollback();
+ if (_fs != nullptr) {
+ // delete the actual created file, otherwise an orphan file is
left behind
+ auto path = fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name());
+ Status st = _fs->delete_file(path);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format("Delete file {} failed, reason:
{}", path,
+ st.to_string());
+ }
}
}
if (status_ok) {
@@ -164,26 +169,59 @@ THivePartitionUpdate
VHivePartitionWriter::_build_partition_update() {
DCHECK(_file_format_transformer != nullptr);
hive_partition_update.__set_file_size(_file_format_transformer->written_len());
- if (_write_info.file_type == TFileType::FILE_S3) {
- DCHECK(_file_writer != nullptr);
- doris::io::S3FileWriter* s3_mpu_file_writer =
- dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
- DCHECK(s3_mpu_file_writer != nullptr);
- TS3MPUPendingUpload s3_mpu_pending_upload;
- s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
- s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
- s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());
-
- std::map<int, std::string> etags;
- for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
- etags.insert({completed_part.part_num, completed_part.etag});
- }
- s3_mpu_pending_upload.__set_etags(etags);
+ TS3MPUPendingUpload s3_mpu_pending_upload;
+ if (_build_s3_mpu_pending_upload(&s3_mpu_pending_upload)) {
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
}
return hive_partition_update;
}
+bool VHivePartitionWriter::_build_s3_mpu_pending_upload(TS3MPUPendingUpload*
pending_upload) {
+ DCHECK(pending_upload != nullptr);
+ if (_write_info.file_type != TFileType::FILE_S3 || _file_writer ==
nullptr) {
+ return false;
+ }
+
+ doris::io::S3FileWriter* s3_mpu_file_writer =
+ dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
+ DCHECK(s3_mpu_file_writer != nullptr);
+ std::string upload_id = s3_mpu_file_writer->upload_id();
+ if (upload_id.empty()) {
+ return false;
+ }
+
+ pending_upload->__set_bucket(s3_mpu_file_writer->bucket());
+ pending_upload->__set_key(s3_mpu_file_writer->key());
+ pending_upload->__set_upload_id(upload_id);
+
+ std::map<int, std::string> etags;
+ for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
+ etags.insert({completed_part.part_num, completed_part.etag});
+ }
+ pending_upload->__set_etags(etags);
+ return true;
+}
+
+void VHivePartitionWriter::_add_s3_mpu_pending_upload_for_rollback() {
+ TS3MPUPendingUpload s3_mpu_pending_upload;
+ if (!_build_s3_mpu_pending_upload(&s3_mpu_pending_upload)) {
+ return;
+ }
+
+ THivePartitionUpdate hive_partition_update;
+ hive_partition_update.__set_name(_partition_name);
+ hive_partition_update.__set_update_mode(_update_mode);
+ THiveLocationParams location;
+ location.__set_write_path(_write_info.original_write_path);
+ location.__set_target_path(_write_info.target_path);
+ hive_partition_update.__set_location(location);
+ hive_partition_update.__set_file_names({});
+ hive_partition_update.__set_row_count(0);
+ hive_partition_update.__set_file_size(0);
+
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
+ _state->add_hive_partition_updates(hive_partition_update);
+}
+
std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type
file_format_type,
TFileCompressType::type
write_compress_type) {
std::string compress_name;
diff --git a/be/src/exec/sink/writer/vhive_partition_writer.h
b/be/src/exec/sink/writer/vhive_partition_writer.h
index e958db505c1..0b124108623 100644
--- a/be/src/exec/sink/writer/vhive_partition_writer.h
+++ b/be/src/exec/sink/writer/vhive_partition_writer.h
@@ -76,6 +76,8 @@ private:
private:
THivePartitionUpdate _build_partition_update();
+ bool _build_s3_mpu_pending_upload(TS3MPUPendingUpload* pending_upload);
+ void _add_s3_mpu_pending_upload_for_rollback();
std::string _get_file_extension(TFileFormatType::type file_format_type,
TFileCompressType::type
write_compress_type);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 0e2bd1d531c..0fa6a21f59e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -165,9 +165,30 @@ public class HMSTransaction implements Transaction {
return new ArrayList<>(mm.values());
}
+ private void
collectUncompletedMpuPendingUploads(List<THivePartitionUpdate> hivePUs) {
+ for (THivePartitionUpdate pu : hivePUs) {
+ if (pu.getS3MpuPendingUploads() != null) {
+ for (TS3MPUPendingUpload s3MPUPendingUpload :
pu.getS3MpuPendingUploads()) {
+ uncompletedMpuPendingUploads.add(
+ new
UncompletedMpuPendingUpload(s3MPUPendingUpload,
pu.getLocation().getWritePath()));
+ }
+ }
+ }
+ }
+
@Override
public void rollback() {
if (hmsCommitter == null) {
+ collectUncompletedMpuPendingUploads(hivePartitionUpdates);
+ if (uncompletedMpuPendingUploads.isEmpty()) {
+ return;
+ }
+ hmsCommitter = new HmsCommitter();
+ try {
+ hmsCommitter.rollback();
+ } finally {
+ hmsCommitter.shutdownExecutorService();
+ }
return;
}
try {
@@ -224,14 +245,7 @@ public class HMSTransaction implements Transaction {
}
List<THivePartitionUpdate> mergedPUs =
mergePartitions(hivePartitionUpdates);
- for (THivePartitionUpdate pu : mergedPUs) {
- if (pu.getS3MpuPendingUploads() != null) {
- for (TS3MPUPendingUpload s3MPUPendingUpload :
pu.getS3MpuPendingUploads()) {
- uncompletedMpuPendingUploads.add(
- new
UncompletedMpuPendingUpload(s3MPUPendingUpload,
pu.getLocation().getWritePath()));
- }
- }
- }
+ collectUncompletedMpuPendingUploads(mergedPUs);
List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : mergedPUs) {
TUpdateMode updateMode = pu.getUpdateMode();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
index 1468d0d5b3c..35bdb686544 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
@@ -23,9 +23,17 @@ import org.apache.doris.filesystem.FileEntry;
import org.apache.doris.filesystem.FileIterator;
import org.apache.doris.filesystem.FileSystem;
import org.apache.doris.filesystem.Location;
+import org.apache.doris.filesystem.UploadPartResult;
import org.apache.doris.filesystem.local.LocalFileSystem;
+import org.apache.doris.filesystem.spi.ObjFileSystem;
+import org.apache.doris.filesystem.spi.ObjStorage;
+import org.apache.doris.filesystem.spi.RemoteObject;
+import org.apache.doris.filesystem.spi.RemoteObjects;
+import org.apache.doris.filesystem.spi.RequestBody;
import org.apache.doris.fs.SpiSwitchingFileSystem;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THiveLocationParams;
+import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TS3MPUPendingUpload;
import org.junit.After;
@@ -173,6 +181,12 @@ public class HMSTransactionPathTest {
return new HMSTransaction(null, spiFs, Runnable::run);
}
+ private static void setEmptyStagingDirectory(HMSTransaction tx) throws
Exception {
+ Field stagingDirField =
HMSTransaction.class.getDeclaredField("stagingDirectory");
+ stagingDirField.setAccessible(true);
+ stagingDirField.set(tx, java.util.Optional.empty());
+ }
+
private static class FakeFileSystem implements FileSystem {
IOException listDirectoriesThrows;
IOException listFilesThrows;
@@ -302,9 +316,7 @@ public class HMSTransactionPathTest {
uploads.add(upload);
// stagingDirectory is only initialized inside commit(); initialize it
here to avoid NPE in rollback().
- Field stagingDirField =
HMSTransaction.class.getDeclaredField("stagingDirectory");
- stagingDirField.setAccessible(true);
- stagingDirField.set(tx, java.util.Optional.empty());
+ setEmptyStagingDirectory(tx);
// Instantiate HmsCommitter (package-private inner class) for this
transaction.
Class<?> committerClass =
Arrays.stream(HMSTransaction.class.getDeclaredClasses())
@@ -323,4 +335,131 @@ public class HMSTransactionPathTest {
// After rollback, uncompletedMpuPendingUploads must be cleared.
Assert.assertTrue("uncompletedMpuPendingUploads must be cleared after
rollback", uploads.isEmpty());
}
+
+ @Test
+ public void testRollbackAbortsPendingMpuBeforeCommitterCreated() throws
Exception {
+ TrackingObjStorage storage = new TrackingObjStorage();
+ HMSTransaction tx = createTransaction(new TestObjFileSystem(storage));
+
+ TS3MPUPendingUpload mpu = new TS3MPUPendingUpload();
+ mpu.setBucket("test-bucket");
+ mpu.setKey("warehouse/table/data-0.parquet");
+ mpu.setUploadId("upload-id-1");
+
+ THiveLocationParams location = new THiveLocationParams();
+ location.setWritePath("s3://test-bucket/warehouse/table");
+
+ THivePartitionUpdate update = new THivePartitionUpdate();
+ update.setLocation(location);
+ update.setFileNames(Collections.emptyList());
+ update.setRowCount(0);
+ update.setFileSize(0);
+ update.setS3MpuPendingUploads(Collections.singletonList(mpu));
+ tx.updateHivePartitionUpdates(Collections.singletonList(update));
+ setEmptyStagingDirectory(tx);
+
+ tx.rollback();
+
+
Assert.assertEquals(Collections.singletonList("s3://test-bucket/warehouse/table/data-0.parquet"),
+ storage.abortedPaths);
+ Assert.assertEquals(Collections.singletonList("upload-id-1"),
storage.abortedUploadIds);
+ }
+
+ private static class TestObjFileSystem extends ObjFileSystem {
+ TestObjFileSystem(ObjStorage<?> storage) {
+ super(storage);
+ }
+
+ @Override
+ public void mkdirs(Location location) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void delete(Location location, boolean recursive) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void rename(Location src, Location dst) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileIterator list(Location location) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DorisInputFile newInputFile(Location location) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DorisOutputFile newOutputFile(Location location) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class TrackingObjStorage implements ObjStorage<Object> {
+ private final List<String> abortedPaths = new ArrayList<>();
+ private final List<String> abortedUploadIds = new ArrayList<>();
+
+ @Override
+ public Object getClient() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RemoteObjects listObjects(String remotePath, String
continuationToken) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RemoteObject headObject(String remotePath) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putObject(String remotePath, RequestBody requestBody)
throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteObject(String remotePath) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void copyObject(String srcPath, String dstPath) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String initiateMultipartUpload(String remotePath) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UploadPartResult uploadPart(String remotePath, String uploadId,
int partNum,
+ RequestBody body) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void completeMultipartUpload(String remotePath, String uploadId,
+ List<UploadPartResult> parts) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void abortMultipartUpload(String remotePath, String uploadId)
throws IOException {
+ abortedPaths.add(remotePath);
+ abortedUploadIds.add(uploadId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]