This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 226454dfa3758ea2bd95fbb5bab7d49ffc410d24
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Nov 11 08:59:46 2022 -0800

    HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)
    
    (cherry picked from commit c17be7013ade5920ef2ae056b48b8b1d8b04d136)
---
 .../impl/TestKeyValueStreamDataChannel.java        |   2 +-
 .../fs/ozone/TestOzoneFileSystemWithStreaming.java | 158 +++++++++++++++++++++
 .../client/rpc/TestBlockDataStreamOutput.java      |   2 -
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  23 +--
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  23 ++-
 5 files changed, 174 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
index d252b1cb1b..ddbd4b39f4 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
@@ -199,7 +199,7 @@ public class TestKeyValueStreamDataChannel {
 
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(
-        ByteBuffer src, WriteOption... writeOptions) {
+        ByteBuffer src, Iterable<WriteOption> writeOptions) {
       final int written;
       try {
         written = writeBuffers(
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
new file mode 100644
index 0000000000..f2aa527598
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.Timeout;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
+
+/**
+ * Ozone file system tests with Streaming.
+ */
+public class TestOzoneFileSystemWithStreaming {
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneBucket bucket;
+
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+
+  {
+    try {
+      init();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void init() throws Exception {
+    final int chunkSize = 16 << 10;
+    final int flushSize = 2 * chunkSize;
+    final int maxFlushSize = 2 * flushSize;
+    final int blockSize = 2 * maxFlushSize;
+    final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
+
+    conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
+    conf.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);
+    conf.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
+    conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(5)
+        .setTotalPipelineNumLimit(10)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setDataStreamBufferFlushize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .setDataStreamMinPacketSize(chunkSize)
+        .setDataStreamStreamWindowSize(5 * chunkSize)
+        .build();
+    cluster.waitForClusterToBeReady();
+
+    // create a volume and a bucket to be used by OzoneFileSystem
+    bucket = TestDataUtil.createVolumeAndBucket(cluster, layout);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testO3fsCreateFile() throws Exception {
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s.%s/",
+        OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final Path file = new Path("/file");
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      runTestCreateFile(fs, file);
+    }
+  }
+
+  @Test
+  public void testOfsCreateFile() throws Exception {
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName()
+        + OZONE_URI_DELIMITER + bucket.getName();
+    final Path file = new Path(dir, "file");
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      runTestCreateFile(fs, file);
+    }
+  }
+
+  static void runTestCreateFile(FileSystem fs, Path file) throws Exception {
+    final byte[] bytes = new byte[1 << 20];
+    ThreadLocalRandom.current().nextBytes(bytes);
+
+    ContractTestUtils.createFile(fs, file, true, bytes);
+
+    final byte[] buffer = new byte[4 << 10];
+    int offset = 0;
+    try (FSDataInputStream in = fs.open(file)) {
+      for (; ;) {
+        final int n = in.read(buffer, 0, buffer.length);
+        if (n <= 0) {
+          break;
+        }
+        for (int i = 0; i < n; i++) {
+          Assertions.assertEquals(bytes[offset + i], buffer[i]);
+        }
+        offset += n;
+      }
+    }
+    Assertions.assertEquals(bytes.length, offset);
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index c8a0115a80..f232a9298e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -49,7 +49,6 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
@@ -91,7 +90,6 @@ public class TestBlockDataStreamOutput {
     OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
     conf.setFromObject(clientConfig);
 
-    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, 
TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index a1584b1a95..36c6c0ffc7 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -272,22 +272,13 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
       boolean overWrite, boolean recursive) throws IOException {
     incrementCounter(Statistic.OBJECTS_CREATED, 1);
     try {
-      OzoneDataStreamOutput ozoneDataStreamOutput = null;
-      if (replication == ReplicationFactor.ONE.getValue()
-          || replication == ReplicationFactor.THREE.getValue()) {
-
-        ReplicationConfig customReplicationConfig =
-            ReplicationConfig.adjustReplication(bucketReplicationConfig,
-                replication, config);
-        ozoneDataStreamOutput = bucket
-            .createStreamFile(key, 0, customReplicationConfig, overWrite,
-                recursive);
-      } else {
-        ozoneDataStreamOutput = bucket.createStreamFile(
-            key, 0, bucketReplicationConfig, overWrite, recursive);
-      }
-      return new OzoneFSDataStreamOutput(
-          ozoneDataStreamOutput.getByteBufStreamOutput());
+      final ReplicationConfig replicationConfig
+          = OzoneClientUtils.resolveClientSideReplicationConfig(
+          replication, clientConfiguredReplicationConfig,
+          getReplicationConfigWithRefreshCheck(), config);
+      final OzoneDataStreamOutput out = bucket.createStreamFile(
+          key, 0, replicationConfig, overWrite, recursive);
+      return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
           || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 9d1bc4980d..ac3d38b958 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -406,21 +406,14 @@ public class BasicRootedOzoneClientAdapterImpl
     String key = ofsPath.getKeyName();
     try {
       // Hadoop CopyCommands class always sets recursive to true
-      OzoneBucket bucket = getBucket(ofsPath, recursive);
-      OzoneDataStreamOutput ozoneDataStreamOutput = null;
-      if (replication == ReplicationFactor.ONE.getValue()
-          || replication == ReplicationFactor.THREE.getValue()) {
-
-        ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
-            ReplicationConfig.adjustReplication(
-                clientConfiguredReplicationConfig, replication, config),
-            overWrite, recursive);
-      } else {
-        ozoneDataStreamOutput = bucket.createStreamFile(
-            key, 0, clientConfiguredReplicationConfig, overWrite, recursive);
-      }
-      return new OzoneFSDataStreamOutput(
-          ozoneDataStreamOutput.getByteBufStreamOutput());
+      final OzoneBucket bucket = getBucket(ofsPath, recursive);
+      final ReplicationConfig replicationConfig
+          = OzoneClientUtils.resolveClientSideReplicationConfig(
+          replication, clientConfiguredReplicationConfig,
+          bucket.getReplicationConfig(), config);
+      final OzoneDataStreamOutput out = bucket.createStreamFile(
+          key, 0, replicationConfig, overWrite, recursive);
+      return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
           || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to