This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236-original in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 30ba5c87e8978c5651e64c649d783b8db57a6425 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Thu Oct 17 21:36:43 2024 +0800 Integration of TOS: Add ObjectOutputStream, MagicOutputStream. --- .../hadoop/fs/tosfs/commit/MagicOutputStream.java | 146 +++++++++ .../org/apache/hadoop/fs/tosfs/conf/ConfKeys.java | 39 +++ .../hadoop/fs/tosfs/object/ObjectOutputStream.java | 331 +++++++++++++++++++++ .../fs/tosfs/object/staging/FileStagingPart.java | 176 +++++++++++ .../fs/tosfs/object/staging/StagingPart.java | 79 +++++ .../hadoop/fs/tosfs/object/staging/State.java | 23 ++ 6 files changed, 794 insertions(+) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/MagicOutputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/MagicOutputStream.java new file mode 100644 index 00000000000..d78d9466079 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/MagicOutputStream.java @@ -0,0 +1,146 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +public class MagicOutputStream extends ObjectOutputStream { + + private final FileSystem fs; + private final Path pendingPath; + private boolean closeStorage = false; + + public MagicOutputStream(FileSystem fs, ExecutorService threadPool, Configuration conf, + Path magic) { + this(fs, + ObjectStorageFactory.create(magic.toUri().getScheme(), magic.toUri().getHost(), conf), + threadPool, + conf, + magic); + closeStorage = true; + } + + public MagicOutputStream(FileSystem fs, ObjectStorage storage, ExecutorService threadPool, + Configuration conf, Path magic) { + super(storage, threadPool, conf, magic, false); + this.fs = fs; + this.pendingPath = createPendingPath(magic); + } + + static String toDestKey(Path magicPath) { + Preconditions.checkArgument(isMagic(magicPath), "Destination path is not magic %s", magicPath); + String magicKey = ObjectUtils.pathToKey(magicPath); + List<String> splits = Lists.newArrayList(magicKey.split("/")); + + // Break the full splits list into three collections: <parentSplits>, __magic, <childrenSplits> + int magicIndex = splits.indexOf(CommitUtils.MAGIC); + Preconditions.checkArgument(magicIndex >= 0, "Cannot locate %s in path %s", CommitUtils.MAGIC, magicPath); + List<String> parentSplits = splits.subList(0, magicIndex); + List<String> childrenSplits = splits.subList(magicIndex + 1, splits.size()); + Preconditions.checkArgument(!childrenSplits.isEmpty(), + "No path found under %s for path %s", CommitUtils.MAGIC, magicPath); + + // Generate the destination splits which will be joined into the destination object key. + List<String> destSplits = Lists.newArrayList(parentSplits); + if (childrenSplits.contains(CommitUtils.BASE)) { + // Break the <childrenDir> into three collections: <baseParentSplits>, __base, <baseChildrenSplits>, and add all + // <baseChildrenSplits> into the destination splits. + int baseIndex = childrenSplits.indexOf(CommitUtils.BASE); + Preconditions.checkArgument(baseIndex >= 0, "Cannot locate %s in path %s", CommitUtils.BASE, magicPath); + List<String> baseChildrenSplits = childrenSplits.subList(baseIndex + 1, childrenSplits.size()); + Preconditions.checkArgument(!baseChildrenSplits.isEmpty(), + "No path found under %s for magic path %s", CommitUtils.BASE, magicPath); + destSplits.addAll(baseChildrenSplits); + } else { + // Just add the last elements of the <childrenSplits> into the destination splits. + String filename = childrenSplits.get(childrenSplits.size() - 1); + destSplits.add(filename); + } + + return StringUtils.join(destSplits, "/"); + } + + @Override + protected String createDestKey(Path magicPath) { + return toDestKey(magicPath); + } + + @Override + protected void finishUpload(String destKey, String uploadId, List<Part> parts) + throws IOException { + Pending pending = Pending.builder() + .setBucket(storage().bucket().name()) + .setUploadId(uploadId) + .setLength(parts.stream().mapToLong(Part::size).sum()) + .setDestKey(destKey) + .setCreatedTimestamp(System.currentTimeMillis()) + .addParts(parts) + .build(); + + persist(pendingPath, pending.serialize()); + } + + @Override + public synchronized void close() throws IOException { + super.close(); + if (closeStorage) { + storage().close(); + } + } + + protected void persist(Path p, byte[] data) throws IOException { + CommitUtils.save(fs, p, data); + } + + public String pendingKey() { + return ObjectUtils.pathToKey(pendingPath); + } + + private static Path createPendingPath(Path magic) { + return new Path(magic.getParent(), String.format("%s%s", magic.getName(), CommitUtils.PENDING_SUFFIX)); + } + + // .pending and .pendingset files are not typical magic files. + private static boolean isInternalFile(Path p) { + return p.toString().endsWith(CommitUtils.PENDINGSET_SUFFIX) || p.toString().endsWith(CommitUtils.PENDING_SUFFIX); + } + + public static boolean isMagic(Path p) { + Preconditions.checkNotNull(p, "path cannot be null."); + String path = p.toUri().getPath(); + List<String> splits = Arrays.stream(path.split("/")) + .filter(StringUtils::isNoneEmpty) + .collect(Collectors.toList()); + return splits.contains(CommitUtils.MAGIC) && !isInternalFile(p); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java index b607c69bf67..1ef8e83d63d 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java @@ -18,6 +18,10 @@ package org.apache.hadoop.fs.tosfs.conf; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; + +import java.util.List; + public class ConfKeys { /** @@ -61,6 +65,30 @@ public class ConfKeys { public static final String MULTIPART_COPY_THRESHOLD = "fs.tos.multipart.copy-threshold"; public static final long MULTIPART_COPY_THRESHOLD_DEFAULT = 5L << 20; + /** + * The threshold which control whether enable multipart upload during writing data to the given + * object storage, if the write data size is less than threshold, will write data via simple put + * instead of multipart upload. E.g. fs.tos.multipart.threshold. + */ + public static final String MULTIPART_THRESHOLD = "fs.tos.multipart.threshold"; + public static final long MULTIPART_THRESHOLD_DEFAULT = 10 << 20; + + /** + * The max byte size which will buffer the staging data in-memory before flushing to the staging + * file. It will decrease the random write in local staging disk dramatically if writing plenty of + * small files. + */ + public static final String MULTIPART_STAGING_BUFFER_SIZE = "fs.tos.multipart.staging-buffer-size"; + public static final int MULTIPART_STAGING_BUFFER_SIZE_DEFAULT = 4 << 10; + + /** + * The multipart upload part staging dir(s) of the given object storage. + * e.g. fs.tos.multipart.staging-dir. + * Separate the staging dirs with comma if there are many staging dir paths. + */ + public static final String MULTIPART_STAGING_DIR = "fs.tos.multipart.staging-dir"; + public static final String MULTIPART_STAGING_DIR_DEFAULT = defaultDir("multipart-staging-dir"); + /** * The batch size of deleting multiple objects per request for the given object storage. * e.g. fs.tos.delete.batch-size @@ -90,4 +118,15 @@ public class ConfKeys { */ public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size"; public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE; + + public static String defaultDir(String basename) { + String tmpdir = System.getProperty("java.io.tmpdir"); + Preconditions.checkNotNull(tmpdir, "System property 'java.io.tmpdir' cannot be null"); + + if (tmpdir.endsWith("/")) { + return String.format("%s%s", tmpdir, basename); + } else { + return String.format("%s/%s", tmpdir, basename); + } + } } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectOutputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectOutputStream.java new file mode 100644 index 00000000000..5f5781bf4aa --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectOutputStream.java @@ -0,0 +1,331 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.object.staging.FileStagingPart; +import org.apache.hadoop.fs.tosfs.object.staging.StagingPart; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.SequenceInputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class ObjectOutputStream extends OutputStream { + private static final Logger LOG = LoggerFactory.getLogger(ObjectOutputStream.class); + + private final ObjectStorage storage; + private final ExecutorService uploadPool; + private long totalWroteSize; + private final String destKey; + private final String destScheme; + private final long multiUploadThreshold; + private final long byteSizePerPart; + private final int stagingBufferSize; + private final boolean allowPut; + private final List<File> stagingDirs; + private final List<StagingPart> stagingParts = Lists.newArrayList(); + + // For multipart uploads. + private final AtomicInteger partNumGetter = new AtomicInteger(0); + private MultipartUpload multipartUpload = null; + private final List<CompletableFuture<Part>> results = Lists.newArrayList(); + + private StagingPart curPart; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public ObjectOutputStream(ObjectStorage storage, ExecutorService threadPool, Configuration conf, + Path dest, boolean allowPut) { + this.storage = storage; + this.uploadPool = threadPool; + this.destScheme = dest.toUri().getScheme(); + this.totalWroteSize = 0; + this.destKey = createDestKey(dest); + this.multiUploadThreshold = + conf.getLong(ConfKeys.MULTIPART_THRESHOLD, ConfKeys.MULTIPART_THRESHOLD_DEFAULT); + this.byteSizePerPart = conf.getLong(ConfKeys.MULTIPART_SIZE, ConfKeys.MULTIPART_SIZE_DEFAULT); + this.stagingBufferSize = conf.getInt(ConfKeys.MULTIPART_STAGING_BUFFER_SIZE, + ConfKeys.MULTIPART_STAGING_BUFFER_SIZE_DEFAULT); + this.allowPut = allowPut; + this.stagingDirs = createStagingDirs(conf, destScheme); + + if (!allowPut) { + this.multipartUpload = storage.createMultipartUpload(destKey); + } + } + + private static List<File> createStagingDirs(Configuration conf, String scheme) { + String[] dirs = + conf.getStrings(ConfKeys.MULTIPART_STAGING_DIR, ConfKeys.MULTIPART_STAGING_DIR_DEFAULT); + Preconditions.checkArgument(dirs != null && dirs.length > 0, "'%s' cannot be an empty list", + ConfKeys.MULTIPART_STAGING_DIR); + + List<File> stagingDirs = new ArrayList<>(); + for (String dir : dirs) { + // Create the directory if not exist. + File stagingDir = new File(dir); + if (!stagingDir.exists() && stagingDir.mkdirs()) { + Preconditions.checkArgument(stagingDir.setWritable(true, false), + "Failed to change staging dir permission to writable, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir); + Preconditions.checkArgument(stagingDir.setReadable(true, false), + "Failed to change staging dir permission to readable, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir); + } else { + Preconditions.checkArgument(stagingDir.exists(), + "Failed to create staging dir, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir); + Preconditions.checkArgument(stagingDir.isDirectory(), + "Staging dir should be a directory, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir); + } + stagingDirs.add(stagingDir); + } + return stagingDirs; + } + + private File chooseStagingDir() { + // Choose a random directory from the staging dirs as the candidate staging dir. + return stagingDirs.get(ThreadLocalRandom.current().nextInt(stagingDirs.size())); + } + + @Override + public void write(int b) throws IOException { + write(new byte[]{(byte) b}, 0, 1); + } + + protected String createDestKey(Path dest) { + return ObjectUtils.pathToKey(dest); + } + + @Override + public synchronized void write(byte[] buf, int off, int len) throws IOException { + if (len == 0) { + return; + } + Preconditions.checkArgument(off >= 0 && off < buf.length, + "Invalid offset - off: %s, len: %s, bufferSize: %s", off, len, buf.length); + Preconditions.checkArgument(len >= 0 && off + len <= buf.length, + "Invalid length - off: %s, len: %s, bufferSize: %s", off, len, buf.length); + Preconditions.checkState(!closed.get(), "OutputStream is closed."); + + while (len > 0) { + if (curPart == null) { + curPart = newStagingPart(); + } + + Preconditions.checkArgument(curPart.size() <= byteSizePerPart, + "Invalid staging size (%s) which is greater than part size (%s)", curPart.size(), byteSizePerPart); + + // size is the remaining length to fill a complete upload part. + int size = (int) Math.min(byteSizePerPart - curPart.size(), len); + curPart.write(buf, off, size); + + off += size; + len -= size; + totalWroteSize += size; + + // Switch to the next staging part if current staging part is full. + if (curPart.size() >= byteSizePerPart) { + curPart.complete(); + + // Upload this part if multipart upload was triggered. + if (multipartUpload != null) { + CompletableFuture<Part> result = asyncUploadPart(curPart, partNumGetter.incrementAndGet()); + results.add(result); + } + + // Reset the stagingOut + curPart = null; + } + + // Trigger the multipart upload when reach the configured threshold. + if (multipartUpload == null && totalWroteSize >= multiUploadThreshold) { + multipartUpload = storage.createMultipartUpload(destKey); + Preconditions.checkState(byteSizePerPart >= multipartUpload.minPartSize(), + "Configured upload part size %s must be greater than or equals to the minimal part size %s," + + " please check configure key %s.", + byteSizePerPart, multipartUpload.minPartSize(), ConfKeys.MULTIPART_THRESHOLD.format(destScheme)); + + // Upload the accumulated staging files whose length >= byteSizePerPart. + for (StagingPart stagingPart : stagingParts) { + if (stagingPart.size() >= byteSizePerPart) { + CompletableFuture<Part> result = asyncUploadPart(stagingPart, partNumGetter.incrementAndGet()); + results.add(result); + } + } + } + } + } + + private CompletableFuture<Part> asyncUploadPart(final StagingPart stagingPart, final int partNum) { + final MultipartUpload immutableUpload = multipartUpload; + return CompletableFuture.supplyAsync(() -> uploadPart(stagingPart, partNum), uploadPool) + .whenComplete((part, err) -> { + stagingPart.cleanup(); + if (err != null) { + LOG.error("Failed to upload part, multipartUpload: {}, partNum: {}, stagingPart: {}", + immutableUpload, partNum, stagingPart, err); + } + }); + } + + private CompletableFuture<Part> asyncUploadEmptyPart(final int partNum) { + final MultipartUpload immutableUpload = multipartUpload; + return CompletableFuture.supplyAsync( + () -> storage.uploadPart( + immutableUpload.key(), + immutableUpload.uploadId(), + partNum, + () -> new ByteArrayInputStream(new byte[0]), + 0), + uploadPool) + .whenComplete((part, err) -> { + if (err != null) { + LOG.error("Failed to upload empty part, multipartUpload: {}, partNum: {}", + immutableUpload, partNum, err); + } + }); + } + + private Part uploadPart(StagingPart stagingPart, int partNum) { + Preconditions.checkNotNull(storage, "Object storage cannot be null."); + Preconditions.checkNotNull(multipartUpload, "Multipart upload is not initialized."); + return storage.uploadPart(multipartUpload.key(), multipartUpload.uploadId(), + partNum, stagingPart::newIn, stagingPart.size()); + } + + protected void finishUpload(String key, String uploadId, List<Part> parts) throws IOException { + storage.completeUpload(key, uploadId, parts); + } + + private void simplePut() throws IOException { + if (curPart != null) { + curPart.complete(); + } + storage.put( + destKey, + () -> stagingParts() + .stream() + .map(StagingPart::newIn) + .reduce(SequenceInputStream::new) + .orElseGet(() -> new ByteArrayInputStream(new byte[0])), + stagingParts().stream().mapToLong(StagingPart::size).sum()); + // Reset the staging output stream. + curPart = null; + } + + synchronized List<Part> waitForPartsUpload() { + Preconditions.checkArgument(multipartUpload != null, "Multipart upload cannot be null"); + Preconditions.checkArgument(!results.isEmpty(), "Upload parts cannot be empty"); + // Waiting for all the upload parts to be finished. + return results.stream() + .map(CompletableFuture::join) + .sorted(Comparator.comparing(Part::num)) + .collect(Collectors.toList()); + } + + @Override + public synchronized void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } + + try { + // Use the simple PUT API if wrote bytes is not reached the multipart threshold. + if (multipartUpload == null && allowPut) { + simplePut(); + return; + } + Preconditions.checkNotNull(multipartUpload, "MultipartUpload cannot be null since allowPut was disabled."); + + // Use multipart upload API to upload those parts. + if (totalWroteSize <= 0) { + // Write an empty part for this zero-byte file. + CompletableFuture<Part> result = asyncUploadEmptyPart(partNumGetter.incrementAndGet()); + results.add(result); + } else if (curPart != null) { + curPart.complete(); + // Submit the last part to upload thread pool. + CompletableFuture<Part> result = asyncUploadPart(curPart, partNumGetter.incrementAndGet()); + results.add(result); + // Reset the staging output stream. + curPart = null; + } + + // Finish the multipart uploads. + finishUpload(multipartUpload.key(), multipartUpload.uploadId(), waitForPartsUpload()); + + } catch (Exception e) { + LOG.error("Encountering error when closing output stream", e); + if (multipartUpload != null) { + CommonUtils.runQuietly(() -> storage.abortMultipartUpload(multipartUpload.key(), multipartUpload.uploadId())); + } + throw e; + } finally { + // Clear all the staging part. + deleteStagingPart(stagingParts); + } + } + + public long totalWroteSize() { + return totalWroteSize; + } + + public ObjectStorage storage() { + return storage; + } + + public List<StagingPart> stagingParts() { + return stagingParts; + } + + public String destKey() { + return destKey; + } + + public MultipartUpload upload() { + return multipartUpload; + } + + private void deleteStagingPart(List<StagingPart> parts) { + for (StagingPart part : parts) { + part.cleanup(); + } + } + + private StagingPart newStagingPart() { + String stagingPath = String.format("%s/staging-%s.tmp", chooseStagingDir(), UUIDUtils.random()); + StagingPart part = new FileStagingPart(stagingPath, stagingBufferSize); + stagingParts.add(part); + return part; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/FileStagingPart.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/FileStagingPart.java new file mode 100644 index 00000000000..7f70c0820e3 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/FileStagingPart.java @@ -0,0 +1,176 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.staging; + +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class FileStagingPart implements StagingPart { + private static final Logger LOG = LoggerFactory.getLogger(FileStagingPart.class); + + private final Path path; + private final int stagingBufferSize; + private final StagingFileOutputStream out; + private State state = State.WRITABLE; + + public FileStagingPart(String filePath, int stagingBufferSize) { + this.path = Paths.get(filePath); + this.stagingBufferSize = stagingBufferSize; + this.out = new StagingFileOutputStream(path, stagingBufferSize); + } + + @Override + public synchronized void write(byte[] b, int off, int len) throws IOException { + Preconditions.checkState(state == State.WRITABLE, + "Cannot write the part since it's not writable now, state: %s", state); + out.write(b, off, len); + } + + @Override + public synchronized void complete() throws IOException { + Preconditions.checkState(state == State.WRITABLE, + "Cannot complete the part since it's not writable now, state: %s", state); + out.close(); + state = State.READABLE; + } + + @Override + public synchronized InputStream newIn() { + Preconditions.checkState(state == State.READABLE, + "Cannot read the part since it's not readable now, state: %s.", state); + return out.newIn(); + } + + @Override + public synchronized long size() { + return out.size(); + } + + @Override + public synchronized State state() { + return state; + } + + @Override + public synchronized void cleanup() { + if (state != State.CLEANED) { + try { + // Close the stream quietly. + CommonUtils.runQuietly(out::close, false); + + // Delete the staging file if exists. + Files.deleteIfExists(path); + } catch (Exception e) { + LOG.error("Failed to delete staging file, stagingFile: {}", path, e); + } finally { + state = State.CLEANED; + } + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("stagingBufferSize", stagingBufferSize) + .add("wroteByteSize", size()) + .toString(); + } + + private static class StagingFileOutputStream extends OutputStream { + private final Path path; + private byte[] buffer; + private boolean memBuffered; + private int writePos; + private OutputStream out; + + private StagingFileOutputStream(Path path, int stagingBufferSize) { + this.path = path; + this.buffer = new byte[stagingBufferSize]; + this.memBuffered = true; + this.writePos = 0; + } + + private int size() { + return writePos; + } + + public InputStream newIn() { + // Just wrap it as a byte array input stream if the staging bytes are still in the in-memory buffer. + if (memBuffered) { + return new ByteArrayInputStream(buffer, 0, writePos); + } + + // Create a buffered file input stream. + try { + return new BufferedInputStream(Files.newInputStream(path)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void write(int b) throws IOException { + write(new byte[]{(byte) b}, 0, 1); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (memBuffered && writePos + len > buffer.length) { + flushMemToFile(); + } + + if (memBuffered) { + System.arraycopy(b, off, buffer, writePos, len); + } else { + out.write(b, off, len); + } + + writePos += len; + } + + @Override + public void close() throws IOException { + if (out != null) { + out.close(); + out = null; + } + } + + private void flushMemToFile() throws IOException { + // Flush the buffered data to the new file OutputStream. + out = new BufferedOutputStream(Files.newOutputStream(path)); + out.write(buffer, 0, writePos); + memBuffered = false; + buffer = null; + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/StagingPart.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/StagingPart.java new file mode 100644 index 00000000000..1f8ef4bc092 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/StagingPart.java @@ -0,0 +1,79 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.staging; + +import java.io.IOException; +import java.io.InputStream; + +public interface StagingPart { + + /** + * Write bytes into the staging part. + * + * @param b the buffer to write. + * @throws IOException if any IO error. + */ + default void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * Write the bytes into the staging part. + * + * @param b the buffer to write. + * @param off the start offset in buffer. + * @param len the length. + * @throws IOException if any IO error. + */ + void write(byte[] b, int off, int len) throws IOException; + + /** + * Complete the writing process and cannot write more bytes once we've completed this part. + * + * @throws IOException if any IO error. + */ + void complete() throws IOException; + + /** + * The wrote size of staging part. + * + * @return the staging part size. + */ + long size(); + + /** + * Access the {@link State} of this part. + * + * @return the {@link State}. + */ + State state(); + + /** + * Create a separate new {@link InputStream} to read the staging part data once we've completed the + * writing by calling {@link StagingPart#complete()} . Call this method several times will return + * many {@link InputStream}s, and remember to close the newly created stream. + * + * @return a totally new {@link InputStream}. + */ + InputStream newIn(); + + /** + * Clean all the {@link StagingPart}'s resources, such as removing temporary file, free the buffered data etc. it + * should be idempotent and quiet (without throwing IO error). + */ + void cleanup(); +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/State.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/State.java new file mode 100644 index 00000000000..418baa6d9b1 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/State.java @@ -0,0 +1,23 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.staging; + +public enum State { + WRITABLE, + READABLE, + CLEANED +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org