This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 0d61b363a2 [core] file io support TwoPhaseOutputStream (#6287) 0d61b363a2 is described below commit 0d61b363a2598b883e5ed3a18bf9d7ebd8d9691c Author: jerry <lining....@alibaba-inc.com> AuthorDate: Fri Sep 19 15:48:16 2025 +0800 [core] file io support TwoPhaseOutputStream (#6287) --- .../src/main/java/org/apache/paimon/fs/FileIO.java | 20 ++ .../org/apache/paimon/fs/MultiPartUploadStore.java | 50 +++ .../fs/MultiPartUploadTwoPhaseOutputStream.java | 222 ++++++++++++ .../paimon/fs/RenamingTwoPhaseOutputStream.java | 145 ++++++++ .../org/apache/paimon/fs/TwoPhaseOutputStream.java | 54 +++ .../fs/RenamingTwoPhaseOutputStreamTest.java | 141 ++++++++ .../main/java/org/apache/paimon/oss/OSSFileIO.java | 15 + .../org/apache/paimon/oss/OSSMultiPartUpload.java | 72 ++++ .../apache/paimon/oss/OssTwoPhaseOutputStream.java | 44 +++ .../paimon/oss/OssTwoPhaseOutputStreamTest.java | 382 +++++++++++++++++++++ .../apache/paimon/s3/HadoopCompliantFileIO.java | 4 +- .../main/java/org/apache/paimon/s3/S3FileIO.java | 13 + .../org/apache/paimon/s3/S3MultiPartUpload.java | 120 +++++++ .../apache/paimon/s3/S3TwoPhaseOutputStream.java | 44 +++ .../paimon/s3/S3TwoPhaseOutputStreamTest.java | 368 ++++++++++++++++++++ 15 files changed, 1692 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index b7037693ee..5e5fe9fbfe 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -93,6 +93,26 @@ public interface FileIO extends Serializable, Closeable { */ PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException; + /** + * Opens a TwoPhaseOutputStream at the indicated Path for transactional writing. + * + * <p>This method creates a stream that supports transactional writing operations. The written + * data becomes visible only after calling commit on the returned committer from closeForCommit + * method. + * + * @param path the file target path + * @param overwrite if a file with this name already exists, then if true, the file will be + * overwritten, and if false an error will be thrown. + * @return a TwoPhaseOutputStream that supports transactional writes + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a + * file already exists at that path and the write mode indicates to not overwrite the file. + * @throws UnsupportedOperationException if the filesystem does not support transactional writes + */ + default TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite) + throws IOException { + return new RenamingTwoPhaseOutputStream(this, path, overwrite); + } + /** * Return a file status object that represents the path. * diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java new file mode 100644 index 0000000000..2a18bed8a2 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +/** MultiPartUploadStore. */ +public interface MultiPartUploadStore<T, C> { + + default String pathToObject(Path hadoopPath) { + if (!hadoopPath.isAbsolute()) { + hadoopPath = new Path(workingDirectory(), hadoopPath); + } + + return hadoopPath.toUri().getPath().substring(1); + } + + Path workingDirectory(); + + String startMultiPartUpload(String objectName) throws IOException; + + C completeMultipartUpload( + String objectName, String uploadId, List<T> partETags, long numBytesInParts) + throws IOException; + + T uploadPart(String objectName, String uploadId, int partNumber, File file, long byteLength) + throws IOException; + + void abortMultipartUpload(String objectName, String uploadId) throws IOException; +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java new file mode 100644 index 0000000000..362635b4df --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** According to multipart upload to support two phase commit. */ +public abstract class MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhaseOutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(MultiPartUploadTwoPhaseOutputStream.class); + + private final ByteArrayOutputStream buffer; + private final List<T> uploadedParts; + private final MultiPartUploadStore<T, C> multiPartUploadStore; + private final String objectName; + + private String uploadId; + private long position; + private boolean closed = false; + + public MultiPartUploadTwoPhaseOutputStream( + MultiPartUploadStore<T, C> multiPartUploadStore, org.apache.hadoop.fs.Path hadoopPath) + throws IOException { + this.multiPartUploadStore = multiPartUploadStore; + this.buffer = new ByteArrayOutputStream(); + this.uploadedParts = new ArrayList<>(); + this.objectName = multiPartUploadStore.pathToObject(hadoopPath); + this.uploadId = multiPartUploadStore.startMultiPartUpload(objectName); + this.position = 0; + } + + public abstract long partSizeThreshold(); + + @Override + public long getPos() throws IOException { + return position; + } + + @Override + public void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + buffer.write(b); + position++; + if (buffer.size() >= partSizeThreshold()) { + uploadPart(); + } + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + buffer.write(b, off, len); + position += len; + if (buffer.size() >= partSizeThreshold()) { + uploadPart(); + } + } + + @Override + public void flush() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + } + + @Override + public void close() throws IOException { + if (!closed) { + Committer committer = closeForCommit(); + committer.commit(); + } + } + + @Override + public Committer closeForCommit() throws IOException { + if (closed) { + throw new IOException("Stream is already closed"); + } + closed = true; + + if (buffer.size() > 0) { + uploadPart(); + } + + return new MultiPartUploadCommitter( + multiPartUploadStore, uploadId, uploadedParts, objectName, position); + } + + private void uploadPart() throws IOException { + if (buffer.size() == 0) { + return; + } + + File tempFile = null; + try { + byte[] data = buffer.toByteArray(); + tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(), ".tmp").toFile(); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + fos.write(data); + fos.flush(); + } + T partETag = + multiPartUploadStore.uploadPart( + objectName, uploadId, uploadedParts.size() + 1, tempFile, data.length); + uploadedParts.add(partETag); + buffer.reset(); + } catch (Exception e) { + throw new IOException( + "Failed to upload part " + + (uploadedParts.size() + 1) + + " for upload ID: " + + uploadId, + e); + } finally { + if (tempFile != null && tempFile.exists()) { + if (!tempFile.delete()) { + LOG.warn("Failed to delete temporary file: {}", tempFile.getAbsolutePath()); + } + } + } + } + + private static class MultiPartUploadCommitter<T, C> implements Committer { + + private final MultiPartUploadStore<T, C> multiPartUploadStore; + private final String uploadId; + private final String objectName; + private final List<T> uploadedParts; + private final long byteLength; + private boolean committed = false; + private boolean discarded = false; + + public MultiPartUploadCommitter( + MultiPartUploadStore<T, C> multiPartUploadStore, + String uploadId, + List<T> uploadedParts, + String objectName, + long byteLength) { + this.multiPartUploadStore = multiPartUploadStore; + this.uploadId = uploadId; + this.objectName = objectName; + this.uploadedParts = new ArrayList<>(uploadedParts); + this.byteLength = byteLength; + } + + @Override + public void commit() throws IOException { + if (committed) { + return; + } + if (discarded) { + throw new IOException("Cannot commit: committer has been discarded"); + } + + try { + multiPartUploadStore.completeMultipartUpload( + objectName, uploadId, uploadedParts, byteLength); + committed = true; + LOG.info( + "Successfully committed multipart upload with ID: {} for objectName: {}", + uploadId, + objectName); + } catch (Exception e) { + throw new IOException("Failed to commit multipart upload with ID: " + uploadId, e); + } + } + + @Override + public void discard() throws IOException { + if (discarded) { + return; + } + + try { + multiPartUploadStore.abortMultipartUpload(objectName, uploadId); + discarded = true; + LOG.info( + "Successfully discarded multipart upload with ID: {} for objectName: {}", + uploadId, + objectName); + } catch (Exception e) { + LOG.warn("Failed to discard multipart upload with ID: {}", uploadId, e); + } + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java new file mode 100644 index 0000000000..0da827d920 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.annotation.Public; + +import java.io.IOException; +import java.util.UUID; + +/** + * A {@link TwoPhaseOutputStream} implementation that writes to a temporary file and commits by + * renaming to the target path. This follows HDFS-style commit semantics. + */ +@Public +public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream { + + private final FileIO fileIO; + private final Path targetPath; + private final Path tempPath; + private final PositionOutputStream tempOutputStream; + + public RenamingTwoPhaseOutputStream(FileIO fileIO, Path targetPath, boolean overwrite) + throws IOException { + if (!overwrite && fileIO.exists(targetPath)) { + throw new IOException("File " + targetPath + " already exists."); + } + this.fileIO = fileIO; + this.targetPath = targetPath; + this.tempPath = generateTempPath(targetPath); + + // Create temporary file + this.tempOutputStream = fileIO.newOutputStream(tempPath, overwrite); + } + + @Override + public void write(int b) throws IOException { + tempOutputStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + tempOutputStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + tempOutputStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + tempOutputStream.flush(); + } + + @Override + public long getPos() throws IOException { + return tempOutputStream.getPos(); + } + + @Override + public void close() throws IOException { + tempOutputStream.close(); + } + + @Override + public Committer closeForCommit() throws IOException { + close(); + return new TempFileCommitter(fileIO, tempPath, targetPath); + } + + /** + * Generate a temporary file path based on the target path. The temp file will be in the same + * directory as the target with a unique suffix. + */ + private Path generateTempPath(Path targetPath) { + String tempFileName = ".tmp." + UUID.randomUUID(); + return new Path(targetPath.getParent(), tempFileName); + } + + /** Committer implementation that renames temporary file to target path. */ + private static class TempFileCommitter implements Committer { + + private final FileIO fileIO; + private final Path tempPath; + private final Path targetPath; + private boolean committed = false; + private boolean discarded = false; + + public TempFileCommitter(FileIO fileIO, Path tempPath, Path targetPath) { + this.fileIO = fileIO; + this.tempPath = tempPath; + this.targetPath = targetPath; + } + + @Override + public void commit() throws IOException { + if (committed || discarded) { + throw new IOException("Committer has already been used"); + } + + try { + Path parentDir = targetPath.getParent(); + if (parentDir != null && !fileIO.exists(parentDir)) { + fileIO.mkdirs(parentDir); + } + + if (!fileIO.rename(tempPath, targetPath)) { + throw new IOException("Failed to rename " + tempPath + " to " + targetPath); + } + + committed = true; + + } catch (IOException e) { + // Clean up temp file on failure + fileIO.deleteQuietly(tempPath); + throw new IOException( + "Failed to commit temporary file " + tempPath + " to " + targetPath, e); + } + } + + @Override + public void discard() { + if (!committed && !discarded) { + fileIO.deleteQuietly(tempPath); + discarded = true; + } + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java new file mode 100644 index 0000000000..401486e91b --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import java.io.IOException; + +/** TwoPhaseOutputStream provides a way to write to a file and get a committer that can commit. */ +public abstract class TwoPhaseOutputStream extends PositionOutputStream { + /** + * Closes the stream for writing and returns a committer that can be used to make the written + * data visible. + * + * <p>After calling this method, the stream should not be used for writing anymore. The returned + * committer can be used to commit the data or discard it. + * + * @return A committer that can be used to commit the data + * @throws IOException if an I/O error occurs during closing + */ + public abstract Committer closeForCommit() throws IOException; + + /** A committer interface that can commit or discard the written data. */ + public interface Committer { + + /** + * Commits the written data, making it visible. + * + * @throws IOException if an I/O error occurs during commit + */ + void commit() throws IOException; + + /** + * Discards the written data, cleaning up any temporary files or resources. + * + * @throws IOException if an I/O error occurs during discard + */ + void discard() throws IOException; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java new file mode 100644 index 0000000000..a68ccb8700 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.fs.local.LocalFileIO; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link RenamingTwoPhaseOutputStream}. */ +public class RenamingTwoPhaseOutputStreamTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path targetPath; + + @BeforeEach + void setup() { + fileIO = new LocalFileIO(); + targetPath = new Path(tempDir.resolve("target-file.txt").toString()); + } + + @Test + void testSuccessfulCommit() throws IOException { + RenamingTwoPhaseOutputStream stream = + new RenamingTwoPhaseOutputStream(fileIO, targetPath, false); + + // Write some data + String testData = "Hello, World!"; + stream.write(testData.getBytes()); + + // Close for commit + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Target file should not exist yet + assertThat(fileIO.exists(targetPath)).isFalse(); + + // Commit the file + committer.commit(); + + // Now target file should exist with correct content + assertThat(fileIO.exists(targetPath)).isTrue(); + + // Read and verify content + byte[] content = Files.readAllBytes(Paths.get(targetPath.toString())); + assertThat(new String(content)).isEqualTo(testData); + } + + @Test + void testDiscard() throws IOException { + RenamingTwoPhaseOutputStream stream = + new RenamingTwoPhaseOutputStream(fileIO, targetPath, false); + + // Write some data + stream.write("Some data".getBytes()); + + // Close for commit + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Discard instead of commit + committer.discard(); + + // Target file should not exist + assertThat(fileIO.exists(targetPath)).isFalse(); + } + + @Test + void testCloseWithoutCommit() throws IOException { + RenamingTwoPhaseOutputStream stream = + new RenamingTwoPhaseOutputStream(fileIO, targetPath, false); + // Write some data + stream.write("Some data".getBytes()); + + // Just close (not closeForCommit) + stream.close(); + + // Target file should not exist (temp file cleaned up) + assertThat(fileIO.exists(targetPath)).isFalse(); + } + + @Test + void testDoubleCommitThrows() throws IOException { + RenamingTwoPhaseOutputStream stream = + new RenamingTwoPhaseOutputStream(fileIO, targetPath, false); + + stream.write("data".getBytes()); + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // First commit should succeed + committer.commit(); + + // Second commit should throw + assertThatThrownBy(committer::commit).isInstanceOf(IOException.class); + } + + @Test + void testPositionTracking() throws IOException { + RenamingTwoPhaseOutputStream stream = + new RenamingTwoPhaseOutputStream(fileIO, targetPath, false); + + assertThat(stream.getPos()).isEqualTo(0); + + stream.write("Hello".getBytes()); + assertThat(stream.getPos()).isEqualTo(5); + + stream.write(" World!".getBytes()); + assertThat(stream.getPos()).isEqualTo(12); + + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + committer.commit(); + + // Verify final content + byte[] content = Files.readAllBytes(Paths.get(targetPath.toString())); + assertThat(new String(content)).isEqualTo("Hello World!"); + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java index 4c457598e7..7e9d8fc68a 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java @@ -20,6 +20,8 @@ package org.apache.paimon.oss; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.options.Options; import org.apache.paimon.utils.IOUtils; @@ -106,6 +108,19 @@ public class OSSFileIO extends HadoopCompliantFileIO { } } + @Override + public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite) + throws IOException { + if (!overwrite && this.exists(path)) { + throw new IOException("File " + path + " already exists."); + } + org.apache.hadoop.fs.Path hadoopPath = path(path); + FileSystem fs = getFileSystem(hadoopPath); + return new OssTwoPhaseOutputStream( + new OSSMultiPartUpload((org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem) fs), + hadoopPath); + } + public Options hadoopOptions() { return hadoopOptions; } diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java new file mode 100644 index 0000000000..6ea486f683 --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.paimon.fs.MultiPartUploadStore; + +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.PartETag; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +/** Provides the multipart upload by Aliyun OSS. */ +public class OSSMultiPartUpload + implements MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> { + + private AliyunOSSFileSystem fs; + private AliyunOSSFileSystemStore store; + + public OSSMultiPartUpload(AliyunOSSFileSystem fs) { + this.fs = fs; + this.store = fs.getStore(); + } + + @Override + public Path workingDirectory() { + return fs.getWorkingDirectory(); + } + + @Override + public String startMultiPartUpload(String objectName) throws IOException { + return store.getUploadId(objectName); + } + + @Override + public CompleteMultipartUploadResult completeMultipartUpload( + String objectName, String uploadId, List<PartETag> partETags, long numBytesInParts) { + return store.completeMultipartUpload(objectName, uploadId, partETags); + } + + @Override + public PartETag uploadPart( + String objectName, String uploadId, int partNumber, File file, long byteLength) + throws IOException { + return store.uploadPart(file, objectName, uploadId, partNumber); + } + + @Override + public void abortMultipartUpload(String objectName, String uploadId) { + store.abortMultipartUpload(objectName, uploadId); + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java new file mode 100644 index 0000000000..cce331d8d0 --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.paimon.fs.MultiPartUploadStore; +import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream; + +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.PartETag; + +import java.io.IOException; + +/** OSS implementation of TwoPhaseOutputStream using multipart upload. */ +public class OssTwoPhaseOutputStream + extends MultiPartUploadTwoPhaseOutputStream<PartETag, CompleteMultipartUploadResult> { + + public OssTwoPhaseOutputStream( + MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> multiPartUploadStore, + org.apache.hadoop.fs.Path hadoopPath) + throws IOException { + super(multiPartUploadStore, hadoopPath); + } + + @Override + public long partSizeThreshold() { + return 10L << 20; + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java new file mode 100644 index 0000000000..ff71b571dc --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.paimon.fs.TwoPhaseOutputStream; + +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.PartETag; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link OssTwoPhaseOutputStream}. */ +public class OssTwoPhaseOutputStreamTest { + + @TempDir java.nio.file.Path tempDir; + + private OssTwoPhaseOutputStream stream; + private MockOSSMultiPartUpload mockAccessor; + private org.apache.hadoop.fs.Path hadoopPath; + private File targetFile; + + @BeforeEach + void setup() throws IOException { + hadoopPath = new org.apache.hadoop.fs.Path("/test/file.parquet"); + targetFile = tempDir.resolve("target-file.parquet").toFile(); + targetFile.getParentFile().mkdirs(); + + mockAccessor = new MockOSSMultiPartUpload(targetFile); + } + + private OssTwoPhaseOutputStream createStream() throws IOException { + return new OssTwoPhaseOutputStream(mockAccessor, hadoopPath); + } + + @Test + void testLargeDataMultipleParts() throws IOException { + stream = createStream(); + + // Write data larger than MIN_PART_SIZE to trigger automatic part upload, + // plus some extra data to ensure there's remaining data for final upload + byte[] largeData = new byte[120 * 1024 * 1024]; // 120MB - will trigger first upload + for (int i = 0; i < largeData.length; i++) { + largeData[i] = (byte) (i % 256); + } + + stream.write(largeData); + + // Write additional data that will remain in buffer for final upload + byte[] extraData = "Additional data for final part".getBytes(); + stream.write(extraData); + + assertThat(stream.getPos()).isEqualTo(largeData.length + extraData.length); + + // Should have triggered automatic part upload + assertThat(mockAccessor.startMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.uploadPartCalls).isEqualTo(1); // One part uploaded automatically + + // Close for commit (uploads remaining data) + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Should have uploaded the remaining data as a final part + assertThat(mockAccessor.uploadPartCalls).isEqualTo(2); // Initial + final part + + // Commit + committer.commit(); + + assertThat(mockAccessor.completeMultipartUploadCalled).isTrue(); + + // Verify target file contains all the data + assertThat(targetFile.exists()).isTrue(); + byte[] writtenContent = Files.readAllBytes(targetFile.toPath()); + + // Combine expected data + byte[] expectedData = new byte[largeData.length + extraData.length]; + System.arraycopy(largeData, 0, expectedData, 0, largeData.length); + System.arraycopy(extraData, 0, expectedData, largeData.length, extraData.length); + + assertThat(writtenContent).isEqualTo(expectedData); + } + + @Test + void testDiscard() throws IOException { + stream = createStream(); + + stream.write("Some data".getBytes()); + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Discard instead of commit + committer.discard(); + + // Verify abort was called, not complete + assertThat(mockAccessor.abortMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.completeMultipartUploadCalled).isFalse(); + + // Target file should not exist + assertThat(targetFile.exists()).isFalse(); + } + + @Test + void testCommitFailure() throws IOException { + stream = createStream(); + mockAccessor.completeMultipartUploadShouldFail = true; + + stream.write("data".getBytes()); + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + assertThatThrownBy(committer::commit) + .isInstanceOf(IOException.class) + .hasMessageContaining("Failed to commit multipart upload"); + + // Target file should not exist on failed commit + assertThat(targetFile.exists()).isFalse(); + } + + @Test + void testUploadPartFailure() throws IOException { + stream = createStream(); + mockAccessor.uploadPartShouldFail = true; + + // Write data and then close to trigger uploadPart during closeForCommit + stream.write("test data".getBytes()); + + assertThatThrownBy(() -> stream.closeForCommit()) + .isInstanceOf(IOException.class) + .hasMessageContaining("Failed to upload part"); + } + + @Test + void testPositionTracking() throws IOException { + stream = createStream(); + + assertThat(stream.getPos()).isEqualTo(0); + + stream.write("Hello".getBytes()); + assertThat(stream.getPos()).isEqualTo(5); + + stream.write(" OSS".getBytes()); + assertThat(stream.getPos()).isEqualTo(9); + + stream.write(" World!".getBytes()); + assertThat(stream.getPos()).isEqualTo(16); + + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + committer.commit(); + + assertThat(mockAccessor.completeMultipartUploadCalled).isTrue(); + + // Verify final content + String writtenContent = new String(Files.readAllBytes(targetFile.toPath())); + assertThat(writtenContent).isEqualTo("Hello OSS World!"); + } + + @Test + void testCommitAfterDiscard() throws IOException { + stream = createStream(); + stream.write("data".getBytes()); + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + committer.discard(); + + assertThatThrownBy(committer::commit) + .isInstanceOf(IOException.class) + .hasMessageContaining("Cannot commit: committer has been discarded"); + + // Target file should not exist + assertThat(targetFile.exists()).isFalse(); + } + + /** + * Mock implementation that actually uses local files to simulate OSS multipart upload behavior. + * Extends OSSAccessor but overrides all methods to avoid initialization issues. + */ + private static class MockOSSMultiPartUpload extends OSSMultiPartUpload { + + boolean startMultipartUploadCalled = false; + int uploadPartCalls = 0; + boolean completeMultipartUploadCalled = false; + int completeMultipartUploadCallCount = 0; + boolean abortMultipartUploadCalled = false; + + boolean uploadPartShouldFail = false; + boolean completeMultipartUploadShouldFail = false; + + private final String mockUploadId = "mock-upload-" + UUID.randomUUID(); + private final List<File> tempPartFiles = new ArrayList<>(); + private final File targetFile; + + @SuppressWarnings("unused") + public MockOSSMultiPartUpload(File targetFile) { + super(createStubFileSystem()); // Create minimal stub to avoid null pointer + this.targetFile = targetFile; + } + + private static org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem createStubFileSystem() { + // Create a minimal stub to avoid NullPointerException during initialization + return new StubAliyunOSSFileSystem(); + } + + @Override + public String pathToObject(org.apache.hadoop.fs.Path hadoopPath) { + return hadoopPath.toUri().getPath().substring(1); + } + + @Override + public String startMultiPartUpload(String objectName) { + startMultipartUploadCalled = true; + return mockUploadId; + } + + @Override + public PartETag uploadPart( + String objectName, String uploadId, int partNumber, File file, long byteLength) + throws IOException { + uploadPartCalls++; + + if (uploadPartShouldFail) { + throw new IOException("Mock upload part failure"); + } + + // Verify file exists and has content + if (!file.exists() || file.length() == 0) { + throw new IOException("Invalid file for upload: " + file); + } + + // Store the part file in a temporary location (simulating storing in OSS) + File partFile = + Files.createTempFile("mock-oss-part-" + partNumber + "-", ".tmp").toFile(); + Files.copy(file.toPath(), partFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + tempPartFiles.add(partFile); + + MockPartETag mockPartETag = new MockPartETag(partNumber, "mock-etag-" + partNumber); + return mockPartETag; + } + + @Override + public CompleteMultipartUploadResult completeMultipartUpload( + String objectName, + String uploadId, + List<PartETag> partETags, + long numBytesInParts) { + completeMultipartUploadCalled = true; + completeMultipartUploadCallCount++; + + if (completeMultipartUploadShouldFail) { + throw new RuntimeException("Mock complete multipart upload failure"); + } + + // Simulate combining all parts into the final target file + try { + try (FileOutputStream fos = new FileOutputStream(targetFile)) { + for (File partFile : tempPartFiles) { + try (FileInputStream fis = new FileInputStream(partFile)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + } + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to complete multipart upload", e); + } + + // Clean up temp part files + for (File partFile : tempPartFiles) { + partFile.delete(); + } + tempPartFiles.clear(); + + return new MockCompleteMultipartUploadResult(objectName, "mock-final-etag"); + } + + @Override + public void abortMultipartUpload(String objectName, String uploadId) { + abortMultipartUploadCalled = true; + + // Clean up temp part files on abort + for (File partFile : tempPartFiles) { + partFile.delete(); + } + tempPartFiles.clear(); + + // Ensure target file doesn't exist + if (targetFile.exists()) { + targetFile.delete(); + } + } + } + + /** Mock implementation of PartETag. */ + private static class MockPartETag extends PartETag { + private final int partNumber; + private final String eTag; + + public MockPartETag(int partNumber, String eTag) { + super(partNumber, eTag); + this.partNumber = partNumber; + this.eTag = eTag; + } + + @Override + public int getPartNumber() { + return partNumber; + } + + @Override + public String getETag() { + return eTag; + } + } + + /** Mock implementation of CompleteMultipartUploadResult. */ + private static class MockCompleteMultipartUploadResult extends CompleteMultipartUploadResult { + private final String key; + private final String eTag; + + public MockCompleteMultipartUploadResult(String key, String eTag) { + this.key = key; + this.eTag = eTag; + } + + @Override + public String getKey() { + return key; + } + + @Override + public String getETag() { + return eTag; + } + } + + /** + * Minimal stub implementation to avoid NullPointerException during OSSAccessor initialization. + */ + private static class StubAliyunOSSFileSystem + extends org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem { + private final StubAliyunOSSFileSystemStore stubStore = new StubAliyunOSSFileSystemStore(); + + @Override + public org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore getStore() { + return stubStore; + } + } + + /** Minimal stub implementation for the store. */ + private static class StubAliyunOSSFileSystemStore + extends org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore { + // Empty stub - we override all methods in MockOSSAccessor anyway + } +} diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java index 9238251033..a662e8a075 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java @@ -123,11 +123,11 @@ public abstract class HadoopCompliantFileIO implements FileIO { return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); } - private org.apache.hadoop.fs.Path path(Path path) { + protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } - private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { + protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { if (fsMap == null) { synchronized (this) { if (fsMap == null) { diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java index a58aa18eac..65017da573 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java @@ -20,6 +20,8 @@ package org.apache.paimon.s3; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; @@ -71,6 +73,17 @@ public class S3FileIO extends HadoopCompliantFileIO { this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context)); } + @Override + public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite) + throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath); + if (!overwrite && this.exists(path)) { + throw new IOException("File " + path + " already exists."); + } + return new S3TwoPhaseOutputStream(new S3MultiPartUpload(fs, fs.getConf()), hadoopPath); + } + // add additional config entries from the IO config to the Hadoop config private Options loadHadoopConfigFromContext(CatalogContext context) { Options hadoopConfig = new Options(); diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java new file mode 100644 index 0000000000..4fd3590027 --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.MultiPartUploadStore; + +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** Provides the multipart upload by Amazon S3. */ +public class S3MultiPartUpload + implements MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> { + + private final S3AFileSystem s3a; + + private final InternalWriteOperationHelper s3accessHelper; + + public S3MultiPartUpload(S3AFileSystem s3a, Configuration conf) { + checkNotNull(s3a); + this.s3accessHelper = + new InternalWriteOperationHelper( + s3a, + checkNotNull(conf), + s3a.createStoreContext().getInstrumentation(), + s3a.getAuditSpanSource(), + s3a.getActiveAuditSpan()); + this.s3a = s3a; + } + + @Override + public Path workingDirectory() { + return s3a.getWorkingDirectory(); + } + + @Override + public String startMultiPartUpload(String objectName) throws IOException { + return s3accessHelper.initiateMultiPartUpload(objectName); + } + + @Override + public CompleteMultipartUploadResult completeMultipartUpload( + String objectName, String uploadId, List<PartETag> partETags, long numBytesInParts) + throws IOException { + return s3accessHelper.completeMPUwithRetries( + objectName, uploadId, partETags, numBytesInParts, new AtomicInteger(0)); + } + + @Override + public PartETag uploadPart( + String objectName, String uploadId, int partNumber, File file, long byteLength) + throws IOException { + final UploadPartRequest uploadRequest = + s3accessHelper.newUploadPartRequest( + objectName, + uploadId, + partNumber, + checkedDownCast(byteLength), + null, + file, + 0L); + return s3accessHelper.uploadPart(uploadRequest).getPartETag(); + } + + @Override + public void abortMultipartUpload(String destKey, String uploadId) throws IOException { + s3accessHelper.abortMultipartUpload(destKey, uploadId, false, null); + } + + private static final class InternalWriteOperationHelper extends WriteOperationHelper { + + InternalWriteOperationHelper( + S3AFileSystem owner, + Configuration conf, + S3AStatisticsContext statisticsContext, + AuditSpanSource auditSpanSource, + AuditSpan auditSpan) { + super(owner, conf, statisticsContext, auditSpanSource, auditSpan); + } + } + + private static int checkedDownCast(long value) { + int downCast = (int) value; + if (downCast != value) { + throw new IllegalArgumentException( + "Cannot downcast long value " + value + " to integer."); + } + return downCast; + } +} diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java new file mode 100644 index 0000000000..d66b134d5c --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.MultiPartUploadStore; +import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream; + +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; + +import java.io.IOException; + +/** S3 implementation of TwoPhaseOutputStream using multipart upload. */ +public class S3TwoPhaseOutputStream + extends MultiPartUploadTwoPhaseOutputStream<PartETag, CompleteMultipartUploadResult> { + + public S3TwoPhaseOutputStream( + MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> multiPartUploadStore, + org.apache.hadoop.fs.Path hadoopPath) + throws IOException { + super(multiPartUploadStore, hadoopPath); + } + + @Override + public long partSizeThreshold() { + return 5L << 20; + } +} diff --git a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java new file mode 100644 index 0000000000..3712916e00 --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.TwoPhaseOutputStream; + +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link S3TwoPhaseOutputStream}. */ +class S3TwoPhaseOutputStreamTest { + + @TempDir java.nio.file.Path tempDir; + + private MockS3MultiPartUpload mockAccessor; + private S3TwoPhaseOutputStream stream; + private File targetFile; + private org.apache.hadoop.fs.Path hadoopPath; + + @BeforeEach + void setUp() throws IOException { + hadoopPath = new org.apache.hadoop.fs.Path("/test/file.parquet"); + targetFile = tempDir.resolve("target-file.parquet").toFile(); + targetFile.getParentFile().mkdirs(); + + mockAccessor = new MockS3MultiPartUpload(targetFile); + } + + private S3TwoPhaseOutputStream createStream() throws IOException { + return new S3TwoPhaseOutputStream(mockAccessor, hadoopPath); + } + + @Test + void testLargeDataMultipleParts() throws IOException { + stream = createStream(); + + // Create data larger than MIN_PART_SIZE (5MB) to trigger multiple part uploads + byte[] bigData = new byte[6 * 1024 * 1024]; // 6MB + for (int i = 0; i < bigData.length; i++) { + bigData[i] = (byte) (i % 256); + } + + // Write the large data + stream.write(bigData); + + // This should trigger automatic part uploads during write + assertThat(mockAccessor.startMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.uploadPartCalls).isGreaterThan(0); + + // Add more data to ensure closeForCommit creates another part + String additionalData = "Additional data for final part"; + stream.write(additionalData.getBytes()); + + assertThat(stream.getPos()).isEqualTo(bigData.length + additionalData.length()); + + // Close for commit + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Should have uploaded multiple parts + assertThat(mockAccessor.uploadPartCalls).isGreaterThan(1); + assertThat(mockAccessor.completeMultipartUploadCalled).isFalse(); + + // Target file should not exist yet + assertThat(targetFile.exists()).isFalse(); + + // Commit + committer.commit(); + + // Verify complete multipart upload was called + assertThat(mockAccessor.completeMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.abortMultipartUploadCalled).isFalse(); + + // Target file should now exist with correct content + assertThat(targetFile.exists()).isTrue(); + + // Verify the content is correct by reading it back + byte[] writtenContent = Files.readAllBytes(targetFile.toPath()); + assertThat(writtenContent).hasSize(bigData.length + additionalData.length()); + + // Check first part (bigData) + for (int i = 0; i < bigData.length; i++) { + assertThat(writtenContent[i]).isEqualTo(bigData[i]); + } + + // Check additional data at the end + byte[] additionalBytes = additionalData.getBytes(); + for (int i = 0; i < additionalBytes.length; i++) { + assertThat(writtenContent[bigData.length + i]).isEqualTo(additionalBytes[i]); + } + } + + @Test + void testDiscard() throws IOException { + stream = createStream(); + stream.write("Hello S3 World!".getBytes()); + + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Verify initial state + assertThat(mockAccessor.startMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.uploadPartCalls).isEqualTo(1); + assertThat(targetFile.exists()).isFalse(); + + // Discard instead of commit + committer.discard(); + + // Verify abort was called + assertThat(mockAccessor.abortMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.completeMultipartUploadCalled).isFalse(); + + // Target file should not exist + assertThat(targetFile.exists()).isFalse(); + } + + @Test + void testCommitAfterDiscard() throws IOException { + stream = createStream(); + stream.write("data".getBytes()); + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + committer.discard(); + + assertThatThrownBy(() -> committer.commit()) + .isInstanceOf(IOException.class) + .hasMessageContaining("Cannot commit: committer has been discarded"); + } + + @Test + void testSimpleCommit() throws IOException { + stream = createStream(); + + String testData = "Hello S3 World!"; + stream.write(testData.getBytes()); + + assertThat(stream.getPos()).isEqualTo(testData.length()); + + // Close for commit + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + // Target file should not exist yet + assertThat(targetFile.exists()).isFalse(); + + // Commit + committer.commit(); + + // Verify upload completed + assertThat(mockAccessor.startMultipartUploadCalled).isTrue(); + assertThat(mockAccessor.uploadPartCalls).isEqualTo(1); + assertThat(mockAccessor.completeMultipartUploadCalled).isTrue(); + + // Target file should exist with correct content + assertThat(targetFile.exists()).isTrue(); + String writtenContent = new String(Files.readAllBytes(targetFile.toPath())); + assertThat(writtenContent).isEqualTo(testData); + } + + @Test + void testDoubleDiscard() throws IOException { + stream = createStream(); + stream.write("data".getBytes()); + TwoPhaseOutputStream.Committer committer = stream.closeForCommit(); + + committer.discard(); + // Second discard should be safe (no-op) + committer.discard(); + + // Abort should only be called once + assertThat(mockAccessor.abortMultipartUploadCallCount).isEqualTo(1); + + // Target file should not exist + assertThat(targetFile.exists()).isFalse(); + } + + /** + * Mock implementation that uses local files to simulate S3 multipart upload behavior. Extends + * S3Accessor but overrides all methods to avoid initialization issues. + */ + private static class MockS3MultiPartUpload extends S3MultiPartUpload { + private final List<File> tempPartFiles = new ArrayList<>(); + private final File targetFile; + private final String mockUploadId = "mock-upload-id-12345"; + + // Test tracking variables + boolean startMultipartUploadCalled = false; + int uploadPartCalls = 0; + boolean completeMultipartUploadCalled = false; + boolean abortMultipartUploadCalled = false; + int abortMultipartUploadCallCount = 0; + + @SuppressWarnings("unused") + public MockS3MultiPartUpload(File targetFile) { + super(createStubFileSystem(), new Configuration()); + this.targetFile = targetFile; + } + + private static S3AFileSystem createStubFileSystem() { + // Create minimal stub to avoid NullPointerException during initialization + return new StubS3AFileSystem(); + } + + @Override + public String pathToObject(org.apache.hadoop.fs.Path hadoopPath) { + return hadoopPath.toUri().getPath().substring(1); + } + + @Override + public String startMultiPartUpload(String key) { + startMultipartUploadCalled = true; + return mockUploadId; + } + + @Override + public PartETag uploadPart( + String key, String uploadId, int partNumber, File inputFile, long byteLength) + throws IOException { + uploadPartCalls++; + + // Create a temporary copy of the part file + File tempPartFile = Files.createTempFile("s3-part-" + partNumber, ".tmp").toFile(); + try (FileInputStream fis = new FileInputStream(inputFile); + FileOutputStream fos = new FileOutputStream(tempPartFile)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + } + tempPartFiles.add(tempPartFile); + + // Return mock UploadPartResult + return new PartETag(partNumber, "etag-" + partNumber); + } + + @Override + public CompleteMultipartUploadResult completeMultipartUpload( + String destKey, String uploadId, List<PartETag> partETags, long length) { + completeMultipartUploadCalled = true; + + // Simulate combining all parts into the final target file + try { + try (FileOutputStream fos = new FileOutputStream(targetFile)) { + for (File partFile : tempPartFiles) { + try (FileInputStream fis = new FileInputStream(partFile)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + } + } + } + + // Clean up temp files + for (File partFile : tempPartFiles) { + if (partFile.exists()) { + partFile.delete(); + } + } + tempPartFiles.clear(); + + return new MockCompleteMultipartUploadResult("mock-bucket", destKey, "mock-etag"); + } catch (IOException e) { + throw new RuntimeException("Failed to complete multipart upload", e); + } + } + + @Override + public void abortMultipartUpload(String destKey, String uploadId) { + abortMultipartUploadCalled = true; + abortMultipartUploadCallCount++; + + // Clean up temp files + for (File partFile : tempPartFiles) { + if (partFile.exists()) { + partFile.delete(); + } + } + tempPartFiles.clear(); + + // Clean up target file if it exists + if (targetFile.exists()) { + targetFile.delete(); + } + } + } + + /** Mock implementation of PartETag. */ + private static class MockPartETag extends PartETag { + private final String eTag; + + public MockPartETag(String eTag, int partNumber) { + super(partNumber, eTag); + this.eTag = eTag; + } + + @Override + public String getETag() { + return eTag; + } + } + + /** Mock implementation of CompleteMultipartUploadResult. */ + private static class MockCompleteMultipartUploadResult extends CompleteMultipartUploadResult { + private final String bucketName; + private final String key; + private final String eTag; + + public MockCompleteMultipartUploadResult(String bucketName, String key, String eTag) { + this.bucketName = bucketName; + this.key = key; + this.eTag = eTag; + } + + @Override + public String getBucketName() { + return bucketName; + } + + @Override + public String getKey() { + return key; + } + + @Override + public String getETag() { + return eTag; + } + } + + /** + * Minimal stub implementation to avoid NullPointerException during S3Accessor initialization. + */ + private static class StubS3AFileSystem extends S3AFileSystem { + // Minimal stub - no implementation needed for our mock + } +}