steveloughran commented on code in PR #4912:
URL: https://github.com/apache/hadoop/pull/4912#discussion_r975295854
##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java:
##########
@@ -49,71 +48,99 @@ public class AliyunOSSBlockOutputStream extends
OutputStream {
LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class);
private AliyunOSSFileSystemStore store;
private Configuration conf;
- private boolean closed;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private String key;
- private File blockFile;
- private Map<Integer, File> blockFiles = new HashMap<>();
- private long blockSize;
+ private int blockSize;
private int blockId = 0;
private long blockWritten = 0L;
private String uploadId = null;
private final List<ListenableFuture<PartETag>> partETagsFutures;
+ private final OSSDataBlocks.BlockFactory blockFactory;
+ private final BlockOutputStreamStatistics statistics;
+ private OSSDataBlocks.DataBlock activeBlock;
private final ListeningExecutorService executorService;
- private OutputStream blockStream;
private final byte[] singleByte = new byte[1];
public AliyunOSSBlockOutputStream(Configuration conf,
AliyunOSSFileSystemStore store,
String key,
- Long blockSize,
+ int blockSize,
+ OSSDataBlocks.BlockFactory blockFactory,
+ BlockOutputStreamStatistics statistics,
ExecutorService executorService) throws IOException {
this.store = store;
this.conf = conf;
this.key = key;
this.blockSize = blockSize;
- this.blockFile = newBlockFile();
- this.blockStream =
- new BufferedOutputStream(new FileOutputStream(blockFile));
+ this.blockFactory = blockFactory;
+ this.statistics = statistics;
+ createBlockIfNeeded();
Review Comment:
if you postpone this until the first byte is written, writing zero byte
marker files is efficient as it avoids the need to allocate any storage capacity
##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/OSSDataBlocks.java:
##########
@@ -0,0 +1,1109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as to OSS as a single PUT, or as part of a multipart request.
+ */
+final class OSSDataBlocks {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OSSDataBlocks.class);
+
+ private OSSDataBlocks() {
+ }
+
+ /**
+ * Validate args to a write command. These are the same validation checks
+ * expected for any implementation of {@code OutputStream.write()}.
+ * @param b byte array containing data
+ * @param off offset in array where to start
+ * @param len number of bytes to be written
+ * @throws NullPointerException for a null buffer
+ * @throws IndexOutOfBoundsException if indices are out of range
+ */
+ static void validateWriteArgs(byte[] b, int off, int len)
+ throws IOException {
+ Preconditions.checkNotNull(b);
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException(
+ "write (b[" + b.length + "], " + off + ", " + len + ')');
+ }
+ }
+
+ /**
+ * Create a factory.
+ * @param owner factory owner
+ * @param name factory name -the option from {@link Constants}.
+ * @return the factory, ready to be initialized.
+ * @throws IllegalArgumentException if the name is unknown.
+ */
+ static BlockFactory createFactory(AliyunOSSFileSystem owner,
+ String name) {
+ switch (name) {
+ case Constants.FAST_UPLOAD_BUFFER_ARRAY:
+ return new ArrayBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BUFFER_DISK:
+ return new DiskBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BYTEBUFFER:
+ return new ByteBufferBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BUFFER_ARRAY_DISK:
Review Comment:
I really like this design, especially if the memory allocation is shared
across all output streams created by a single fs instance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]