This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dfe20f8764 HDDS-8635. Automatically select createKey and
createStreamKey in OzoneFileSystem. (#4763)
dfe20f8764 is described below
commit dfe20f8764904044aea02aadc013c8c560973360
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun May 28 11:54:46 2023 +0800
HDDS-8635. Automatically select createKey and createStreamKey in
OzoneFileSystem. (#4763)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 8 +
.../common/src/main/resources/ozone-default.xml | 9 +
.../ozone/client/io/SelectorOutputStream.java | 189 +++++++++++++++++++++
.../ozone/client/io/TestSelectorOutputStream.java | 159 +++++++++++++++++
.../fs/ozone/TestOzoneFileSystemWithStreaming.java | 76 +++++++--
.../hadoop/fs/ozone/BasicOzoneFileSystem.java | 35 +++-
.../fs/ozone/BasicRootedOzoneFileSystem.java | 34 +++-
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 3 +-
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 3 +-
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 3 +-
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 3 +-
11 files changed, 492 insertions(+), 30 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 587f45db5d..85bc566902 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -111,6 +111,14 @@ public final class OzoneConfigKeys {
public static final boolean OZONE_FS_DATASTREAM_ENABLED_DEFAULT
= false;
+ /**
+ * A threshold to auto select datastream to write files in OzoneFileSystem.
+ */
+ public static final String OZONE_FS_DATASTREAM_AUTO_THRESHOLD
+ = "ozone.fs.datastream.auto.threshold";
+ public static final String OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT
+ = "4MB";
+
/**
* Flag to enable hsync/hflush.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index fa0dee1df6..0de5c43034 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3697,6 +3697,15 @@
To enable/disable filesystem write via ratis streaming.
</description>
</property>
+ <property>
+ <name>ozone.fs.datastream.auto.threshold</name>
+ <value>4MB</value>
+ <tag>OZONE, DATANODE</tag>
+ <description>
+ A threshold to auto select datastream to write files
+ in OzoneFileSystem.
+ </description>
+ </property>
<property>
<name>ozone.fs.hsync.enabled</name>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java
new file mode 100644
index 0000000000..6f2ad0bfa8
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.fs.Syncable;
+import org.apache.ratis.util.function.CheckedFunction;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+/**
+ * An {@link OutputStream} first write data to a buffer up to the capacity.
+ * Then, select {@link Underlying} by the number of bytes written.
+ * When {@link #flush()}, {@link #hflush()}, {@link #hsync()}
+ * or {@link #close()} is invoked,
+ * it will force flushing the buffer and {@link OutputStream} selection.
+ * <p>
+ * This class, like many {@link OutputStream} subclasses, is NOT threadsafe.
+ *
+ * @param <OUT> The underlying {@link OutputStream} type.
+ */
+public class SelectorOutputStream<OUT extends OutputStream>
+ extends OutputStream implements Syncable {
+ /** A buffer backed by a byte[]. */
+ static final class ByteArrayBuffer {
+ private byte[] array;
+ /** Write offset of {@link #array}. */
+ private int offset = 0;
+
+ private ByteArrayBuffer(int capacity) {
+ this.array = new byte[capacity];
+ }
+
+ private void assertRemaining(int outstandingBytes) {
+ Objects.requireNonNull(array, "array == null");
+
+ final int remaining = array.length - offset;
+ if (remaining < 0) {
+ throw new IllegalStateException("remaining = " + remaining + " <= 0");
+ }
+ if (remaining < outstandingBytes) {
+ throw new IllegalArgumentException("Buffer overflow: remaining = "
+ + remaining + " < outstandingBytes = " + outstandingBytes);
+ }
+ }
+
+ void write(byte b) {
+ assertRemaining(1);
+ array[offset] = b;
+ offset++;
+ }
+
+ void write(byte[] src, int srcOffset, int length) {
+ Objects.requireNonNull(src, "src == null");
+ assertRemaining(length);
+ System.arraycopy(src, srcOffset, array, offset, length);
+ offset += length;
+ }
+
+ <OUT extends OutputStream> OUT selectAndClose(
+ int outstandingBytes, boolean force,
+ CheckedFunction<Integer, OUT, IOException> selector)
+ throws IOException {
+ assertRemaining(0);
+ final int required = offset + outstandingBytes;
+ if (force || required > array.length) {
+ final OUT out = selector.apply(required);
+ out.write(array, 0, offset);
+ array = null;
+ return out;
+ }
+ return null;
+ }
+ }
+
+ /** To select the underlying {@link OutputStream}. */
+ final class Underlying {
+ /** Select an {@link OutputStream} by the number of bytes. */
+ private final CheckedFunction<Integer, OUT, IOException> selector;
+ private OUT out;
+
+ private Underlying(CheckedFunction<Integer, OUT, IOException> selector) {
+ this.selector = selector;
+ }
+
+ private OUT select(int outstandingBytes, boolean force) throws IOException
{
+ if (out == null) {
+ out = buffer.selectAndClose(outstandingBytes, force, selector);
+ }
+ return out;
+ }
+ }
+
+ private final ByteArrayBuffer buffer;
+ private final Underlying underlying;
+
+ /**
+ * Construct a {@link SelectorOutputStream} which first writes to a buffer.
+ * Once the buffer has become full, select an {@link OutputStream}.
+ *
+ * @param selectionThreshold The buffer capacity.
+ * @param selector Use bytes-written to select an {@link OutputStream}.
+ */
+ public SelectorOutputStream(int selectionThreshold,
+ CheckedFunction<Integer, OUT, IOException> selector) {
+ this.buffer = new ByteArrayBuffer(selectionThreshold);
+ this.underlying = new Underlying(selector);
+ }
+
+ public OUT getUnderlying() {
+ return underlying.out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ final OUT out = underlying.select(1, false);
+ if (out != null) {
+ out.write(b);
+ } else {
+ buffer.write((byte) b);
+ }
+ }
+
+ @Override
+ public void write(@Nonnull byte[] array, int off, int len)
+ throws IOException {
+ final OUT selected = underlying.select(len, false);
+ if (selected != null) {
+ selected.write(array, off, len);
+ } else {
+ buffer.write(array, off, len);
+ }
+ }
+
+ private OUT select() throws IOException {
+ return underlying.select(0, true);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ select().flush();
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ final OUT out = select();
+ if (out instanceof Syncable) {
+ ((Syncable)out).hflush();
+ } else {
+ throw new IllegalStateException(
+ "Failed to hflush: The underlying OutputStream ("
+ + out.getClass() + ") is not Syncable.");
+ }
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ final OUT out = select();
+ if (out instanceof Syncable) {
+ ((Syncable)out).hsync();
+ } else {
+ throw new IllegalStateException(
+ "Failed to hsync: The underlying OutputStream ("
+ + out.getClass() + ") is not Syncable.");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ select().close();
+ }
+}
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/client/io/TestSelectorOutputStream.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/client/io/TestSelectorOutputStream.java
new file mode 100644
index 0000000000..e436ad8277
--- /dev/null
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/client/io/TestSelectorOutputStream.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.fs.Syncable;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.Supplier;
+
+/**
+ * Test {@link SelectorOutputStream}.
+ */
+@Timeout(30)
+public class TestSelectorOutputStream {
+ static final Logger LOG = LoggerFactory.getLogger(
+ TestSelectorOutputStream.class);
+
+ enum Op {
+ FLUSH(SelectorOutputStream::flush),
+ HFLUSH(SelectorOutputStream::hflush),
+ HSYNC(SelectorOutputStream::hsync),
+ CLOSE(SelectorOutputStream::close);
+
+ private final CheckedConsumer<SelectorOutputStream<?>, IOException> method;
+
+ Op(CheckedConsumer<SelectorOutputStream<?>, IOException> method) {
+ this.method = method;
+ }
+
+ void accept(SelectorOutputStream<OutputStream> out) throws IOException {
+ method.accept(out);
+ }
+ }
+
+ static class SyncableOutputStreamForTesting
+ extends ByteArrayOutputStream implements Syncable {
+ @Override
+ public void hflush() {
+ LOG.info("hflush");
+ }
+
+ @Override
+ public void hsync() {
+ LOG.info("hsync");
+ }
+ }
+
+ static Supplier<OutputStream> getOutputStreamSupplier(boolean isSyncable) {
+ return isSyncable ? SyncableOutputStreamForTesting::new
+ : ByteArrayOutputStream::new;
+ }
+
+ static void runTestSelector(int threshold, int byteToWrite,
+ Op op) throws Exception {
+ runTestSelector(threshold, byteToWrite, op, false);
+ }
+
+ static void runTestSelector(int threshold, int byteToWrite,
+ Op op, boolean isSyncable) throws Exception {
+ LOG.info("run: threshold={}, byteToWrite={}, op={}, isSyncable? {}",
+ threshold, byteToWrite, op, isSyncable);
+ final MemoizedSupplier<OutputStream> belowThreshold
+ = MemoizedSupplier.valueOf(getOutputStreamSupplier(isSyncable));
+ final MemoizedSupplier<OutputStream> aboveThreshold
+ = MemoizedSupplier.valueOf(getOutputStreamSupplier(isSyncable));
+ final CheckedFunction<Integer, OutputStream, IOException> selector
+ = byteWritten -> byteWritten <= threshold ?
+ belowThreshold.get() : aboveThreshold.get();
+
+ final SelectorOutputStream<OutputStream> out = new SelectorOutputStream<>(
+ threshold, selector);
+ for (int i = 0; i < byteToWrite; i++) {
+ out.write(i);
+ }
+
+ // checkout auto selection
+ final boolean isAbove = byteToWrite > threshold;
+ Assertions.assertFalse(belowThreshold.isInitialized());
+ Assertions.assertEquals(isAbove, aboveThreshold.isInitialized());
+
+ final boolean isBelow = !isAbove;
+ if (op != null) {
+ op.accept(out);
+ Assertions.assertEquals(isBelow, belowThreshold.isInitialized());
+ Assertions.assertEquals(isAbove, aboveThreshold.isInitialized());
+ }
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+ runTestSelector(10, 2, Op.FLUSH);
+ runTestSelector(10, 10, Op.FLUSH);
+ runTestSelector(10, 20, Op.FLUSH);
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ runTestSelector(10, 2, Op.CLOSE);
+ runTestSelector(10, 10, Op.CLOSE);
+ runTestSelector(10, 20, Op.CLOSE);
+ }
+
+ @Test
+ public void testHflushSyncable() throws Exception {
+ runTestSelector(10, 2, Op.HFLUSH, true);
+ runTestSelector(10, 10, Op.HFLUSH, true);
+ runTestSelector(10, 20, Op.HFLUSH, true);
+ }
+
+ @Test
+ public void testHflushNonSyncable() {
+ final IllegalStateException thrown = Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> runTestSelector(10, 2, Op.HFLUSH, false));
+ LOG.info("thrown", thrown);
+ Assertions.assertTrue(thrown.getMessage().contains("not Syncable"));
+ }
+
+ @Test
+ public void testHSyncSyncable() throws Exception {
+ runTestSelector(10, 2, Op.HSYNC, true);
+ runTestSelector(10, 10, Op.HSYNC, true);
+ runTestSelector(10, 20, Op.HSYNC, true);
+ }
+
+ @Test
+ public void testHSyncNonSyncable() {
+ final IllegalStateException thrown = Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> runTestSelector(10, 2, Op.HSYNC, false));
+ LOG.info("thrown", thrown);
+ Assertions.assertTrue(thrown.getMessage().contains("not Syncable"));
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
index 34b824b5db..5d06854682 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
@@ -18,27 +18,34 @@
package org.apache.hadoop.fs.ozone;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.SelectorOutputStream;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
@@ -54,6 +61,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
@Timeout(value = 300)
public class TestOzoneFileSystemWithStreaming {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestOzoneFileSystemWithStreaming.class);
+ private static final int AUTO_THRESHOLD = 2 << 20;
private static MiniOzoneCluster cluster;
private static OzoneBucket bucket;
@@ -71,7 +81,8 @@ public class TestOzoneFileSystemWithStreaming {
CONF.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);
- CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
+ CONF.set(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, AUTO_THRESHOLD + "B");
+ CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, true);
CONF.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
cluster = MiniOzoneCluster.newBuilder(CONF)
.setNumDatanodes(5)
@@ -107,10 +118,11 @@ public class TestOzoneFileSystemWithStreaming {
OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
- final Path file = new Path("/file");
-
try (FileSystem fs = FileSystem.get(CONF)) {
- runTestCreateFile(fs, file);
+ for (int i = 1; i <= 3; i++) {
+ final Path file = new Path("/file" + i);
+ runTestCreateFile(fs, file, i << 20);
+ }
}
}
@@ -123,18 +135,62 @@ public class TestOzoneFileSystemWithStreaming {
final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
- final Path file = new Path(dir, "file");
try (FileSystem fs = FileSystem.get(CONF)) {
- runTestCreateFile(fs, file);
+ for (int i = 1; i <= 3; i++) {
+ final Path file = new Path(dir, "file" + i);
+ runTestCreateFile(fs, file, i << 20);
+ }
+ }
+ }
+
+ static void createFile(FileSystem fs, Path path, boolean overwrite,
+ byte[] data) throws IOException {
+
+ final FSDataOutputStream out = fs.create(path, overwrite);
+ out.write(data);
+
+ final OutputStream wrapped = out.getWrappedStream();
+ LOG.info("wrapped: {}", wrapped.getClass());
+ Assertions.assertEquals(SelectorOutputStream.class, wrapped.getClass());
+ final SelectorOutputStream<?> selector = (SelectorOutputStream<?>) wrapped;
+ final boolean belowThreshold = data.length <= AUTO_THRESHOLD;
+ LOG.info("data.length={}, threshold={}, belowThreshold? {}",
+ data.length, AUTO_THRESHOLD, belowThreshold);
+ assertUnderlying(selector, belowThreshold);
+
+ out.close();
+ final OutputStream underlying = selector.getUnderlying();
+ Assertions.assertNotNull(underlying);
+ LOG.info("underlying after close: {}", underlying.getClass());
+ if (belowThreshold) {
+ Assertions.assertTrue(underlying instanceof OzoneFSOutputStream);
+ } else {
+ Assertions.assertEquals(OzoneFSDataStreamOutput.class,
+ underlying.getClass());
+ }
+ }
+
+ static void assertUnderlying(SelectorOutputStream<?> selector,
+ boolean belowThreshold) {
+ final OutputStream underlying = selector.getUnderlying();
+ LOG.info("underlying before close: {}", underlying != null ?
+ underlying.getClass() : null);
+ if (belowThreshold) {
+ Assertions.assertNull(underlying);
+ } else {
+ Assertions.assertNotNull(underlying);
+ Assertions.assertEquals(OzoneFSDataStreamOutput.class,
+ underlying.getClass());
}
}
- static void runTestCreateFile(FileSystem fs, Path file) throws Exception {
- final byte[] bytes = new byte[1 << 20];
+ static void runTestCreateFile(FileSystem fs, Path file, int size)
+ throws Exception {
+ final byte[] bytes = new byte[size];
ThreadLocalRandom.current().nextBytes(bytes);
- ContractTestUtils.createFile(fs, file, true, bytes);
+ createFile(fs, file, true, bytes);
final byte[] buffer = new byte[4 << 10];
int offset = 0;
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 3a60346d94..3dd8276394 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -43,12 +43,14 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.io.SelectorOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.http.client.utils.URIBuilder;
+import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,6 +113,9 @@ public class BasicOzoneFileSystem extends FileSystem {
OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
private boolean hsyncEnabled = OZONE_FS_HSYNC_ENABLED_DEFAULT;
+ private boolean isRatisStreamingEnabled
+ = OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT;
+ private int streamingAutoThreshold;
private static final Pattern URL_SCHEMA_PATTERN =
Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
@@ -131,6 +136,13 @@ public class BasicOzoneFileSystem extends FileSystem {
listingPageSize = OzoneClientUtils.limitValue(listingPageSize,
OZONE_FS_LISTING_PAGE_SIZE,
OZONE_FS_MAX_LISTING_PAGE_SIZE);
+ isRatisStreamingEnabled = conf.getBoolean(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
+ streamingAutoThreshold = (int) OzoneConfiguration.of(conf).getStorageSize(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT,
+ StorageUnit.BYTES);
hsyncEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
OZONE_FS_HSYNC_ENABLED_DEFAULT);
@@ -281,21 +293,32 @@ public class BasicOzoneFileSystem extends FileSystem {
replication, flags.contains(CreateFlag.OVERWRITE), false);
}
+ private OutputStream selectOutputStream(String key, short replication,
+ boolean overwrite, boolean recursive, int byteWritten)
+ throws IOException {
+ return isRatisStreamingEnabled && byteWritten > streamingAutoThreshold ?
+ adapter.createStreamFile(key, replication, overwrite, recursive)
+ : createFSOutputStream(adapter.createFile(
+ key, replication, overwrite, recursive));
+ }
+
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
- boolean isRatisStreamingEnabled = getConf().getBoolean(
- OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
- OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
if (isRatisStreamingEnabled) {
- return new FSDataOutputStream(adapter.createStreamFile(key,
- replication, overwrite, recursive), statistics);
+ // select OutputStream type based on byteWritten
+ final CheckedFunction<Integer, OutputStream, IOException> selector
+ = byteWritten -> selectOutputStream(
+ key, replication, overwrite, recursive, byteWritten);
+ return new FSDataOutputStream(new SelectorOutputStream<>(
+ streamingAutoThreshold, selector), statistics);
}
+
return new FSDataOutputStream(createFSOutputStream(
adapter.createFile(key,
replication, overwrite, recursive)), statistics);
}
- protected OutputStream createFSOutputStream(
+ protected OzoneFSOutputStream createFSOutputStream(
OzoneFSOutputStream outputStream) {
return outputStream;
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index cffb36c144..ca8a0b0d68 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -46,12 +46,14 @@ import org.apache.hadoop.ozone.OFSPath;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.SelectorOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.http.client.utils.URIBuilder;
+import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +116,9 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
private boolean hsyncEnabled = OZONE_FS_HSYNC_ENABLED_DEFAULT;
+ private boolean isRatisStreamingEnabled
+ = OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT;
+ private int streamingAutoThreshold;
private static final String URI_EXCEPTION_TEXT =
"URL should be one of the following formats: " +
@@ -130,6 +135,13 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
listingPageSize = OzoneClientUtils.limitValue(listingPageSize,
OZONE_FS_LISTING_PAGE_SIZE,
OZONE_FS_MAX_LISTING_PAGE_SIZE);
+ isRatisStreamingEnabled = conf.getBoolean(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
+ streamingAutoThreshold = (int) OzoneConfiguration.of(conf).getStorageSize(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT,
+ StorageUnit.BYTES);
hsyncEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
OZONE_FS_HSYNC_ENABLED_DEFAULT);
@@ -263,21 +275,31 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
replication, flags.contains(CreateFlag.OVERWRITE), false);
}
+ private OutputStream selectOutputStream(String key, short replication,
+ boolean overwrite, boolean recursive, int byteWritten)
+ throws IOException {
+ return isRatisStreamingEnabled && byteWritten > streamingAutoThreshold ?
+ adapter.createStreamFile(key, replication, overwrite, recursive)
+ : createFSOutputStream(adapter.createFile(
+ key, replication, overwrite, recursive));
+ }
+
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
- boolean isRatisStreamingEnabled = getConf().getBoolean(
- OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
- OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
if (isRatisStreamingEnabled) {
- return new FSDataOutputStream(adapter.createStreamFile(key,
- replication, overwrite, recursive), statistics);
+ // select OutputStream type based on byteWritten
+ final CheckedFunction<Integer, OutputStream, IOException> selector
+ = byteWritten -> selectOutputStream(
+ key, replication, overwrite, recursive, byteWritten);
+ return new FSDataOutputStream(new SelectorOutputStream<>(
+ streamingAutoThreshold, selector), statistics);
}
return new FSDataOutputStream(createFSOutputStream(
adapter.createFile(key,
replication, overwrite, recursive)), statistics);
}
- protected OutputStream createFSOutputStream(
+ protected OzoneFSOutputStream createFSOutputStream(
OzoneFSOutputStream outputStream) {
return outputStream;
}
diff --git
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index e1e6c6f523..8fa46d7a53 100644
---
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.ozone;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.crypto.key.KeyProvider;
@@ -104,7 +103,7 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
}
@Override
- protected OutputStream createFSOutputStream(
+ protected OzoneFSOutputStream createFSOutputStream(
OzoneFSOutputStream outputStream) {
return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
}
diff --git
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index ea3fc8e9c7..8243e04f9c 100644
---
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URI;
/**
@@ -102,7 +101,7 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
}
@Override
- protected OutputStream createFSOutputStream(
+ protected OzoneFSOutputStream createFSOutputStream(
OzoneFSOutputStream outputStream) {
return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
}
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index e1e6c6f523..8fa46d7a53 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.ozone;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.crypto.key.KeyProvider;
@@ -104,7 +103,7 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
}
@Override
- protected OutputStream createFSOutputStream(
+ protected OzoneFSOutputStream createFSOutputStream(
OzoneFSOutputStream outputStream) {
return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
}
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 699ccffefd..8808e28e88 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URI;
/**
@@ -102,7 +101,7 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
}
@Override
- protected OutputStream createFSOutputStream(
+ protected OzoneFSOutputStream createFSOutputStream(
OzoneFSOutputStream outputStream) {
return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]