This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dfe20f8764 HDDS-8635. Automatically select createKey and 
createStreamKey in OzoneFileSystem. (#4763)
dfe20f8764 is described below

commit dfe20f8764904044aea02aadc013c8c560973360
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun May 28 11:54:46 2023 +0800

    HDDS-8635. Automatically select createKey and createStreamKey in 
OzoneFileSystem. (#4763)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   8 +
 .../common/src/main/resources/ozone-default.xml    |   9 +
 .../ozone/client/io/SelectorOutputStream.java      | 189 +++++++++++++++++++++
 .../ozone/client/io/TestSelectorOutputStream.java  | 159 +++++++++++++++++
 .../fs/ozone/TestOzoneFileSystemWithStreaming.java |  76 +++++++--
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |  35 +++-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |  34 +++-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |   3 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |   3 +-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |   3 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |   3 +-
 11 files changed, 492 insertions(+), 30 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 587f45db5d..85bc566902 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -111,6 +111,14 @@ public final class OzoneConfigKeys {
   public static final boolean OZONE_FS_DATASTREAM_ENABLED_DEFAULT
       = false;
 
+  /**
+   * A threshold to auto select datastream to write files in OzoneFileSystem.
+   */
+  public static final String OZONE_FS_DATASTREAM_AUTO_THRESHOLD
+      = "ozone.fs.datastream.auto.threshold";
+  public static final String OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT
+      = "4MB";
+
   /**
    * Flag to enable hsync/hflush.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index fa0dee1df6..0de5c43034 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3697,6 +3697,15 @@
       To enable/disable filesystem write via ratis streaming.
     </description>
   </property>
+  <property>
+    <name>ozone.fs.datastream.auto.threshold</name>
+    <value>4MB</value>
+    <tag>OZONE, DATANODE</tag>
+    <description>
+      A threshold to auto select datastream to write files
+      in OzoneFileSystem.
+    </description>
+  </property>
 
   <property>
     <name>ozone.fs.hsync.enabled</name>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java
new file mode 100644
index 0000000000..6f2ad0bfa8
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.fs.Syncable;
+import org.apache.ratis.util.function.CheckedFunction;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+/**
+ * An {@link OutputStream} first write data to a buffer up to the capacity.
+ * Then, select {@link Underlying} by the number of bytes written.
+ * When {@link #flush()}, {@link #hflush()}, {@link #hsync()}
+ * or {@link #close()} is invoked,
+ * it will force flushing the buffer and {@link OutputStream} selection.
+ * <p>
+ * This class, like many {@link OutputStream} subclasses, is NOT threadsafe.
+ *
+ * @param <OUT> The underlying {@link OutputStream} type.
+ */
+public class SelectorOutputStream<OUT extends OutputStream>
+    extends OutputStream implements Syncable {
+  /** A buffer backed by a byte[]. */
+  static final class ByteArrayBuffer {
+    private byte[] array;
+    /** Write offset of {@link #array}. */
+    private int offset = 0;
+
+    private ByteArrayBuffer(int capacity) {
+      this.array = new byte[capacity];
+    }
+
+    private void assertRemaining(int outstandingBytes) {
+      Objects.requireNonNull(array, "array == null");
+
+      final int remaining = array.length - offset;
+      if (remaining < 0) {
+        throw new IllegalStateException("remaining = " + remaining + " <= 0");
+      }
+      if (remaining < outstandingBytes) {
+        throw new IllegalArgumentException("Buffer overflow: remaining = "
+            + remaining + " < outstandingBytes = " + outstandingBytes);
+      }
+    }
+
+    void write(byte b) {
+      assertRemaining(1);
+      array[offset] = b;
+      offset++;
+    }
+
+    void write(byte[] src, int srcOffset, int length) {
+      Objects.requireNonNull(src, "src == null");
+      assertRemaining(length);
+      System.arraycopy(src, srcOffset, array, offset, length);
+      offset += length;
+    }
+
+    <OUT extends OutputStream> OUT selectAndClose(
+        int outstandingBytes, boolean force,
+        CheckedFunction<Integer, OUT, IOException> selector)
+        throws IOException {
+      assertRemaining(0);
+      final int required = offset + outstandingBytes;
+      if (force || required > array.length) {
+        final OUT out = selector.apply(required);
+        out.write(array, 0, offset);
+        array = null;
+        return out;
+      }
+      return null;
+    }
+  }
+
+  /** To select the underlying {@link OutputStream}. */
+  final class Underlying {
+    /** Select an {@link OutputStream} by the number of bytes. */
+    private final CheckedFunction<Integer, OUT, IOException> selector;
+    private OUT out;
+
+    private Underlying(CheckedFunction<Integer, OUT, IOException> selector) {
+      this.selector = selector;
+    }
+
+    private OUT select(int outstandingBytes, boolean force) throws IOException 
{
+      if (out == null) {
+        out = buffer.selectAndClose(outstandingBytes, force, selector);
+      }
+      return out;
+    }
+  }
+
+  private final ByteArrayBuffer buffer;
+  private final Underlying underlying;
+
+  /**
+   * Construct a {@link SelectorOutputStream} which first writes to a buffer.
+   * Once the buffer has become full, select an {@link OutputStream}.
+   *
+   * @param selectionThreshold The buffer capacity.
+   * @param selector Use bytes-written to select an {@link OutputStream}.
+   */
+  public SelectorOutputStream(int selectionThreshold,
+      CheckedFunction<Integer, OUT, IOException> selector) {
+    this.buffer = new ByteArrayBuffer(selectionThreshold);
+    this.underlying = new Underlying(selector);
+  }
+
+  public OUT getUnderlying() {
+    return underlying.out;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    final OUT out = underlying.select(1, false);
+    if (out != null) {
+      out.write(b);
+    } else {
+      buffer.write((byte) b);
+    }
+  }
+
+  @Override
+  public void write(@Nonnull byte[] array, int off, int len)
+      throws IOException {
+    final OUT selected = underlying.select(len, false);
+    if (selected != null) {
+      selected.write(array, off, len);
+    } else {
+      buffer.write(array, off, len);
+    }
+  }
+
+  private OUT select() throws IOException {
+    return underlying.select(0, true);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    select().flush();
+  }
+
+  @Override
+  public void hflush() throws IOException {
+    final OUT out = select();
+    if (out instanceof Syncable) {
+      ((Syncable)out).hflush();
+    } else {
+      throw new IllegalStateException(
+          "Failed to hflush: The underlying OutputStream ("
+              + out.getClass() + ") is not Syncable.");
+    }
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    final OUT out = select();
+    if (out instanceof Syncable) {
+      ((Syncable)out).hsync();
+    } else {
+      throw new IllegalStateException(
+          "Failed to hsync: The underlying OutputStream ("
+              + out.getClass() + ") is not Syncable.");
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    select().close();
+  }
+}
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/client/io/TestSelectorOutputStream.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/client/io/TestSelectorOutputStream.java
new file mode 100644
index 0000000000..e436ad8277
--- /dev/null
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/client/io/TestSelectorOutputStream.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.fs.Syncable;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.Supplier;
+
+/**
+ * Test {@link SelectorOutputStream}.
+ */
+@Timeout(30)
+public class TestSelectorOutputStream {
+  static final Logger LOG = LoggerFactory.getLogger(
+      TestSelectorOutputStream.class);
+
+  enum Op {
+    FLUSH(SelectorOutputStream::flush),
+    HFLUSH(SelectorOutputStream::hflush),
+    HSYNC(SelectorOutputStream::hsync),
+    CLOSE(SelectorOutputStream::close);
+
+    private final CheckedConsumer<SelectorOutputStream<?>, IOException> method;
+
+    Op(CheckedConsumer<SelectorOutputStream<?>, IOException> method) {
+      this.method = method;
+    }
+
+    void accept(SelectorOutputStream<OutputStream> out) throws IOException {
+      method.accept(out);
+    }
+  }
+
+  static class SyncableOutputStreamForTesting
+      extends ByteArrayOutputStream implements Syncable {
+    @Override
+    public void hflush() {
+      LOG.info("hflush");
+    }
+
+    @Override
+    public void hsync() {
+      LOG.info("hsync");
+    }
+  }
+
+  static Supplier<OutputStream> getOutputStreamSupplier(boolean isSyncable) {
+    return isSyncable ? SyncableOutputStreamForTesting::new
+        : ByteArrayOutputStream::new;
+  }
+
+  static void runTestSelector(int threshold, int byteToWrite,
+      Op op) throws Exception {
+    runTestSelector(threshold, byteToWrite, op, false);
+  }
+
+  static void runTestSelector(int threshold, int byteToWrite,
+      Op op, boolean isSyncable) throws Exception {
+    LOG.info("run: threshold={}, byteToWrite={}, op={}, isSyncable? {}",
+        threshold, byteToWrite, op, isSyncable);
+    final MemoizedSupplier<OutputStream> belowThreshold
+        = MemoizedSupplier.valueOf(getOutputStreamSupplier(isSyncable));
+    final MemoizedSupplier<OutputStream> aboveThreshold
+        = MemoizedSupplier.valueOf(getOutputStreamSupplier(isSyncable));
+    final CheckedFunction<Integer, OutputStream, IOException> selector
+        = byteWritten -> byteWritten <= threshold ?
+        belowThreshold.get() : aboveThreshold.get();
+
+    final SelectorOutputStream<OutputStream> out = new SelectorOutputStream<>(
+        threshold, selector);
+    for (int i = 0; i < byteToWrite; i++) {
+      out.write(i);
+    }
+
+    // checkout auto selection
+    final boolean isAbove = byteToWrite > threshold;
+    Assertions.assertFalse(belowThreshold.isInitialized());
+    Assertions.assertEquals(isAbove, aboveThreshold.isInitialized());
+
+    final boolean isBelow = !isAbove;
+    if (op != null) {
+      op.accept(out);
+      Assertions.assertEquals(isBelow, belowThreshold.isInitialized());
+      Assertions.assertEquals(isAbove, aboveThreshold.isInitialized());
+    }
+  }
+
+  @Test
+  public void testFlush() throws Exception {
+    runTestSelector(10, 2, Op.FLUSH);
+    runTestSelector(10, 10, Op.FLUSH);
+    runTestSelector(10, 20, Op.FLUSH);
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    runTestSelector(10, 2, Op.CLOSE);
+    runTestSelector(10, 10, Op.CLOSE);
+    runTestSelector(10, 20, Op.CLOSE);
+  }
+
+  @Test
+  public void testHflushSyncable() throws Exception {
+    runTestSelector(10, 2, Op.HFLUSH, true);
+    runTestSelector(10, 10, Op.HFLUSH, true);
+    runTestSelector(10, 20, Op.HFLUSH, true);
+  }
+
+  @Test
+  public void testHflushNonSyncable() {
+    final IllegalStateException thrown = Assertions.assertThrows(
+        IllegalStateException.class,
+        () -> runTestSelector(10, 2, Op.HFLUSH, false));
+    LOG.info("thrown", thrown);
+    Assertions.assertTrue(thrown.getMessage().contains("not Syncable"));
+  }
+
+  @Test
+  public void testHSyncSyncable() throws Exception {
+    runTestSelector(10, 2, Op.HSYNC, true);
+    runTestSelector(10, 10, Op.HSYNC, true);
+    runTestSelector(10, 20, Op.HSYNC, true);
+  }
+
+  @Test
+  public void testHSyncNonSyncable() {
+    final IllegalStateException thrown = Assertions.assertThrows(
+        IllegalStateException.class,
+        () -> runTestSelector(10, 2, Op.HSYNC, false));
+    LOG.info("thrown", thrown);
+    Assertions.assertTrue(thrown.getMessage().contains("not Syncable"));
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
index 34b824b5db..5d06854682 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
@@ -18,27 +18,34 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.SelectorOutputStream;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
@@ -54,6 +61,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 @Timeout(value = 300)
 public class TestOzoneFileSystemWithStreaming {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestOzoneFileSystemWithStreaming.class);
+  private static final int AUTO_THRESHOLD = 2 << 20;
 
   private static MiniOzoneCluster cluster;
   private static OzoneBucket bucket;
@@ -71,7 +81,8 @@ public class TestOzoneFileSystemWithStreaming {
 
     CONF.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
     CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);
-    CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
+    CONF.set(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, AUTO_THRESHOLD + "B");
+    CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, true);
     CONF.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
     cluster = MiniOzoneCluster.newBuilder(CONF)
         .setNumDatanodes(5)
@@ -107,10 +118,11 @@ public class TestOzoneFileSystemWithStreaming {
         OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
     CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
-    final Path file = new Path("/file");
-
     try (FileSystem fs = FileSystem.get(CONF)) {
-      runTestCreateFile(fs, file);
+      for (int i = 1; i <= 3; i++) {
+        final Path file = new Path("/file" + i);
+        runTestCreateFile(fs, file, i << 20);
+      }
     }
   }
 
@@ -123,18 +135,62 @@ public class TestOzoneFileSystemWithStreaming {
 
     final String dir = OZONE_ROOT + bucket.getVolumeName()
         + OZONE_URI_DELIMITER + bucket.getName();
-    final Path file = new Path(dir, "file");
 
     try (FileSystem fs = FileSystem.get(CONF)) {
-      runTestCreateFile(fs, file);
+      for (int i = 1; i <= 3; i++) {
+        final Path file = new Path(dir, "file" + i);
+        runTestCreateFile(fs, file, i << 20);
+      }
+    }
+  }
+
+  static void createFile(FileSystem fs, Path path, boolean overwrite,
+      byte[] data) throws IOException {
+
+    final FSDataOutputStream out = fs.create(path, overwrite);
+    out.write(data);
+
+    final OutputStream wrapped = out.getWrappedStream();
+    LOG.info("wrapped: {}", wrapped.getClass());
+    Assertions.assertEquals(SelectorOutputStream.class, wrapped.getClass());
+    final SelectorOutputStream<?> selector = (SelectorOutputStream<?>) wrapped;
+    final boolean belowThreshold = data.length <= AUTO_THRESHOLD;
+    LOG.info("data.length={}, threshold={}, belowThreshold? {}",
+        data.length, AUTO_THRESHOLD, belowThreshold);
+    assertUnderlying(selector, belowThreshold);
+
+    out.close();
+    final OutputStream underlying = selector.getUnderlying();
+    Assertions.assertNotNull(underlying);
+    LOG.info("underlying after close: {}", underlying.getClass());
+    if (belowThreshold) {
+      Assertions.assertTrue(underlying instanceof OzoneFSOutputStream);
+    } else {
+      Assertions.assertEquals(OzoneFSDataStreamOutput.class,
+          underlying.getClass());
+    }
+  }
+
+  static void assertUnderlying(SelectorOutputStream<?> selector,
+      boolean belowThreshold) {
+    final OutputStream underlying = selector.getUnderlying();
+    LOG.info("underlying before close: {}", underlying != null ?
+        underlying.getClass() : null);
+    if (belowThreshold) {
+      Assertions.assertNull(underlying);
+    } else {
+      Assertions.assertNotNull(underlying);
+      Assertions.assertEquals(OzoneFSDataStreamOutput.class,
+          underlying.getClass());
     }
   }
 
-  static void runTestCreateFile(FileSystem fs, Path file) throws Exception {
-    final byte[] bytes = new byte[1 << 20];
+  static void runTestCreateFile(FileSystem fs, Path file, int size)
+      throws Exception {
+    final byte[] bytes = new byte[size];
     ThreadLocalRandom.current().nextBytes(bytes);
 
-    ContractTestUtils.createFile(fs, file, true, bytes);
+    createFile(fs, file, true, bytes);
 
     final byte[] buffer = new byte[4 << 10];
     int offset = 0;
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 3a60346d94..3dd8276394 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -43,12 +43,14 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.io.SelectorOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,6 +113,9 @@ public class BasicOzoneFileSystem extends FileSystem {
       OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
 
   private boolean hsyncEnabled = OZONE_FS_HSYNC_ENABLED_DEFAULT;
+  private boolean isRatisStreamingEnabled
+      = OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT;
+  private int streamingAutoThreshold;
 
   private static final Pattern URL_SCHEMA_PATTERN =
       Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
@@ -131,6 +136,13 @@ public class BasicOzoneFileSystem extends FileSystem {
     listingPageSize = OzoneClientUtils.limitValue(listingPageSize,
         OZONE_FS_LISTING_PAGE_SIZE,
         OZONE_FS_MAX_LISTING_PAGE_SIZE);
+    isRatisStreamingEnabled = conf.getBoolean(
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
+    streamingAutoThreshold = (int) OzoneConfiguration.of(conf).getStorageSize(
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT,
+        StorageUnit.BYTES);
     hsyncEnabled = conf.getBoolean(
         OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
         OZONE_FS_HSYNC_ENABLED_DEFAULT);
@@ -281,21 +293,32 @@ public class BasicOzoneFileSystem extends FileSystem {
         replication, flags.contains(CreateFlag.OVERWRITE), false);
   }
 
+  private OutputStream selectOutputStream(String key, short replication,
+      boolean overwrite, boolean recursive, int byteWritten)
+      throws IOException {
+    return isRatisStreamingEnabled && byteWritten > streamingAutoThreshold ?
+        adapter.createStreamFile(key, replication, overwrite, recursive)
+        : createFSOutputStream(adapter.createFile(
+        key, replication, overwrite, recursive));
+  }
+
   private FSDataOutputStream createOutputStream(String key, short replication,
       boolean overwrite, boolean recursive) throws IOException {
-    boolean isRatisStreamingEnabled = getConf().getBoolean(
-        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
-        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
     if (isRatisStreamingEnabled) {
-      return new FSDataOutputStream(adapter.createStreamFile(key,
-          replication, overwrite, recursive), statistics);
+      // select OutputStream type based on byteWritten
+      final CheckedFunction<Integer, OutputStream, IOException> selector
+          = byteWritten -> selectOutputStream(
+          key, replication, overwrite, recursive, byteWritten);
+      return new FSDataOutputStream(new SelectorOutputStream<>(
+          streamingAutoThreshold, selector), statistics);
     }
+
     return new FSDataOutputStream(createFSOutputStream(
             adapter.createFile(key,
         replication, overwrite, recursive)), statistics);
   }
 
-  protected OutputStream createFSOutputStream(
+  protected OzoneFSOutputStream createFSOutputStream(
       OzoneFSOutputStream outputStream) {
     return outputStream;
   }
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index cffb36c144..ca8a0b0d68 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -46,12 +46,14 @@ import org.apache.hadoop.ozone.OFSPath;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.SelectorOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,6 +116,9 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
       OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
 
   private boolean hsyncEnabled = OZONE_FS_HSYNC_ENABLED_DEFAULT;
+  private boolean isRatisStreamingEnabled
+      = OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT;
+  private int streamingAutoThreshold;
 
   private static final String URI_EXCEPTION_TEXT =
       "URL should be one of the following formats: " +
@@ -130,6 +135,13 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
     listingPageSize = OzoneClientUtils.limitValue(listingPageSize,
         OZONE_FS_LISTING_PAGE_SIZE,
         OZONE_FS_MAX_LISTING_PAGE_SIZE);
+    isRatisStreamingEnabled = conf.getBoolean(
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
+    streamingAutoThreshold = (int) OzoneConfiguration.of(conf).getStorageSize(
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT,
+        StorageUnit.BYTES);
     hsyncEnabled = conf.getBoolean(
         OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
         OZONE_FS_HSYNC_ENABLED_DEFAULT);
@@ -263,21 +275,31 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
         replication, flags.contains(CreateFlag.OVERWRITE), false);
   }
 
+  private OutputStream selectOutputStream(String key, short replication,
+      boolean overwrite, boolean recursive, int byteWritten)
+      throws IOException {
+    return isRatisStreamingEnabled && byteWritten > streamingAutoThreshold ?
+        adapter.createStreamFile(key, replication, overwrite, recursive)
+        : createFSOutputStream(adapter.createFile(
+        key, replication, overwrite, recursive));
+  }
+
   private FSDataOutputStream createOutputStream(String key, short replication,
       boolean overwrite, boolean recursive) throws IOException {
-    boolean isRatisStreamingEnabled = getConf().getBoolean(
-        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED,
-        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED_DEFAULT);
     if (isRatisStreamingEnabled) {
-      return new FSDataOutputStream(adapter.createStreamFile(key,
-          replication, overwrite, recursive), statistics);
+      // select OutputStream type based on byteWritten
+      final CheckedFunction<Integer, OutputStream, IOException> selector
+          = byteWritten -> selectOutputStream(
+          key, replication, overwrite, recursive, byteWritten);
+      return new FSDataOutputStream(new SelectorOutputStream<>(
+          streamingAutoThreshold, selector), statistics);
     }
     return new FSDataOutputStream(createFSOutputStream(
             adapter.createFile(key,
         replication, overwrite, recursive)), statistics);
   }
 
-  protected OutputStream createFSOutputStream(
+  protected OzoneFSOutputStream createFSOutputStream(
       OzoneFSOutputStream outputStream) {
     return outputStream;
   }
diff --git 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index e1e6c6f523..8fa46d7a53 100644
--- 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.ozone;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
 
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -104,7 +103,7 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
   }
 
   @Override
-  protected OutputStream createFSOutputStream(
+  protected OzoneFSOutputStream createFSOutputStream(
           OzoneFSOutputStream outputStream) {
     return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
   }
diff --git 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index ea3fc8e9c7..8243e04f9c 100644
--- 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
 
 /**
@@ -102,7 +101,7 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
   }
 
   @Override
-  protected OutputStream createFSOutputStream(
+  protected OzoneFSOutputStream createFSOutputStream(
       OzoneFSOutputStream outputStream) {
     return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
   }
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index e1e6c6f523..8fa46d7a53 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.ozone;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
 
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -104,7 +103,7 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
   }
 
   @Override
-  protected OutputStream createFSOutputStream(
+  protected OzoneFSOutputStream createFSOutputStream(
           OzoneFSOutputStream outputStream) {
     return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
   }
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 699ccffefd..8808e28e88 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
 
 /**
@@ -102,7 +101,7 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
   }
 
   @Override
-  protected OutputStream createFSOutputStream(
+  protected OzoneFSOutputStream createFSOutputStream(
           OzoneFSOutputStream outputStream) {
     return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to