This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 58d144369c HDDS-10240. Cleanup zero-copy EC (#7340)
58d144369c is described below
commit 58d144369cd8955cd3214293c43d55411fbbace7
Author: Scolley <[email protected]>
AuthorDate: Mon Nov 4 02:06:32 2024 +0800
HDDS-10240. Cleanup zero-copy EC (#7340)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 3 -
.../common/src/main/resources/ozone-default.xml | 8 -
.../transport/server/GrpcXceiverService.java | 19 +-
.../common/transport/server/XceiverServerGrpc.java | 9 +-
.../replication/GrpcReplicationService.java | 35 +-
.../container/replication/ReplicationServer.java | 29 +-
.../replication/SendContainerRequestHandler.java | 6 +-
.../replication/TestGrpcReplicationService.java | 8 +-
.../TestGrpcReplicationServiceWithZeroCopy.java | 31 --
.../client/rpc/AbstractTestECKeyOutputStream.java | 492 ---------------------
.../ozone/client/rpc/TestECKeyOutputStream.java | 466 ++++++++++++++++++-
.../rpc/TestECKeyOutputStreamWithZeroCopy.java | 31 --
12 files changed, 479 insertions(+), 658 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index df0fdc59a4..5719803b94 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -201,9 +201,6 @@ public final class OzoneConfigKeys {
"ozone.client.ec.grpc.write.timeout";
public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT =
"30s";
- public static final String OZONE_EC_GRPC_ZERO_COPY_ENABLED =
- "ozone.ec.grpc.zerocopy.enabled";
- public static final boolean OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT = true;
/**
* Ozone administrator users delimited by comma.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e2231a5c38..bc90a87b11 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4536,14 +4536,6 @@
to existing buckets till this operation is completed.
</description>
</property>
- <property>
- <name>ozone.ec.grpc.zerocopy.enabled</name>
- <value>true</value>
- <tag>OZONE, DATANODE</tag>
- <description>
- Specify if zero-copy should be enabled for EC GRPC protocol.
- </description>
- </property>
<property>
<name>ozone.om.max.buckets</name>
<value>100000</value>
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 9c3f29d0f0..5f1914402d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -20,7 +20,6 @@ package
org.apache.hadoop.ozone.container.common.transport.server;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
-import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
@@ -31,7 +30,6 @@ import
org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.getSendMethod;
@@ -45,28 +43,20 @@ public class GrpcXceiverService extends
LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
private final ContainerDispatcher dispatcher;
- private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<ContainerCommandRequestProto>
zeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
ContainerCommandRequestProto.getDefaultInstance());
- public GrpcXceiverService(ContainerDispatcher dispatcher,
- boolean zeroCopyEnabled) {
+ public GrpcXceiverService(ContainerDispatcher dispatcher) {
this.dispatcher = dispatcher;
- this.zeroCopyEnabled = zeroCopyEnabled;
}
/**
- * Bind service with zerocopy marshaller equipped for the `send` API if
- * zerocopy is enabled.
+ * Bind service with zerocopy marshaller equipped for the `send` API.
* @return service definition.
*/
public ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
- if (!zeroCopyEnabled) {
- LOG.info("Zerocopy is not enabled.");
- return orig;
- }
ServerServiceDefinition.Builder builder =
ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
@@ -117,10 +107,7 @@ public class GrpcXceiverService extends
isClosed.set(true);
responseObserver.onError(e);
} finally {
- InputStream popStream = zeroCopyMessageMarshaller.popStream(request);
- if (popStream != null) {
- IOUtils.close(LOG, popStream);
- }
+ zeroCopyMessageMarshaller.release(request);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 42daaa94be..624f153e87 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -67,9 +67,6 @@ import
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT;
-
/**
* Creates a Grpc server endpoint that acts as the communication layer for
* Ozone containers.
@@ -135,13 +132,9 @@ public final class XceiverServerGrpc implements
XceiverServerSpi {
eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory);
channelType = NioServerSocketChannel.class;
}
- final boolean zeroCopyEnabled = conf.getBoolean(
- OZONE_EC_GRPC_ZERO_COPY_ENABLED,
- OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT);
LOG.info("GrpcServer channel type {}", channelType.getSimpleName());
- GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher,
- zeroCopyEnabled);
+ GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.bossEventLoopGroup(eventLoopGroup)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 6bc237207b..26cd0d82a9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Set;
@@ -59,37 +58,24 @@ public class GrpcReplicationService extends
private final ContainerReplicationSource source;
private final ContainerImporter importer;
- private final boolean zeroCopyEnabled;
-
private final ZeroCopyMessageMarshaller<SendContainerRequest>
sendContainerZeroCopyMessageMarshaller;
private final ZeroCopyMessageMarshaller<CopyContainerRequestProto>
copyContainerZeroCopyMessageMarshaller;
- public GrpcReplicationService(ContainerReplicationSource source,
- ContainerImporter importer, boolean zeroCopyEnabled) {
+ public GrpcReplicationService(ContainerReplicationSource source,
ContainerImporter importer) {
this.source = source;
this.importer = importer;
- this.zeroCopyEnabled = zeroCopyEnabled;
-
- if (zeroCopyEnabled) {
- sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
- SendContainerRequest.getDefaultInstance());
- copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
- CopyContainerRequestProto.getDefaultInstance());
- } else {
- sendContainerZeroCopyMessageMarshaller = null;
- copyContainerZeroCopyMessageMarshaller = null;
- }
+
+ sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
+ SendContainerRequest.getDefaultInstance());
+ copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
+ CopyContainerRequestProto.getDefaultInstance());
}
public ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
- if (!zeroCopyEnabled) {
- LOG.info("Zerocopy is not enabled.");
- return orig;
- }
Set<String> methodNames = new HashSet<>();
ServerServiceDefinition.Builder builder =
@@ -155,14 +141,7 @@ public class GrpcReplicationService extends
} finally {
// output may have already been closed, ignore such errors
IOUtils.cleanupWithLogger(LOG, outputStream);
-
- if (copyContainerZeroCopyMessageMarshaller != null) {
- InputStream popStream =
- copyContainerZeroCopyMessageMarshaller.popStream(request);
- if (popStream != null) {
- IOUtils.cleanupWithLogger(LOG, popStream);
- }
- }
+ copyContainerZeroCopyMessageMarshaller.release(request);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index b4e92a4a60..6ca474bdd8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -99,13 +99,12 @@ public class ReplicationServer {
new LinkedBlockingQueue<>(replicationQueueLimit),
threadFactory);
- init(replicationConfig.isZeroCopyEnable());
+ init();
}
- public void init(boolean enableZeroCopy) {
+ public void init() {
GrpcReplicationService grpcReplicationService = new GrpcReplicationService(
- new OnDemandContainerReplicationSource(controller), importer,
- enableZeroCopy);
+ new OnDemandContainerReplicationSource(controller), importer);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.addService(ServerInterceptors.intercept(
@@ -203,11 +202,6 @@ public class ReplicationServer {
static final String REPLICATION_OUTOFSERVICE_FACTOR_KEY =
PREFIX + "." + OUTOFSERVICE_FACTOR_KEY;
- public static final String ZEROCOPY_ENABLE_KEY = "zerocopy.enabled";
- private static final boolean ZEROCOPY_ENABLE_DEFAULT = true;
- private static final String ZEROCOPY_ENABLE_DEFAULT_VALUE =
- "true";
-
/**
* The maximum number of replication commands a single datanode can execute
* simultaneously.
@@ -249,15 +243,6 @@ public class ReplicationServer {
)
private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;
- @Config(key = ZEROCOPY_ENABLE_KEY,
- type = ConfigType.BOOLEAN,
- defaultValue = ZEROCOPY_ENABLE_DEFAULT_VALUE,
- tags = {DATANODE, SCM},
- description = "Specify if zero-copy should be enabled for " +
- "replication protocol."
- )
- private boolean zeroCopyEnable = ZEROCOPY_ENABLE_DEFAULT;
-
public double getOutOfServiceFactor() {
return outOfServiceFactor;
}
@@ -291,14 +276,6 @@ public class ReplicationServer {
this.replicationQueueLimit = limit;
}
- public boolean isZeroCopyEnable() {
- return zeroCopyEnable;
- }
-
- public void setZeroCopyEnable(boolean zeroCopyEnable) {
- this.zeroCopyEnable = zeroCopyEnable;
- }
-
@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index 506a96fe05..40b4dec349 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -105,10 +104,7 @@ class SendContainerRequestHandler
onError(t);
} finally {
if (marshaller != null) {
- InputStream popStream = marshaller.popStream(req);
- if (popStream != null) {
- IOUtils.cleanupWithLogger(LOG, popStream);
- }
+ marshaller.release(req);
}
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
index 03901b99be..bca98d1e7d 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
@@ -83,17 +83,15 @@ class TestGrpcReplicationService {
@BeforeEach
public void setUp() throws Exception {
- init(false);
+ init();
}
- public void init(boolean isZeroCopy) throws Exception {
+ public void init() throws Exception {
conf = new OzoneConfiguration();
ReplicationServer.ReplicationConfig replicationConfig =
conf.getObject(ReplicationServer.ReplicationConfig.class);
- replicationConfig.setZeroCopyEnable(isZeroCopy);
-
SecurityConfig secConf = new SecurityConfig(conf);
ContainerSet containerSet = new ContainerSet(1000);
@@ -230,7 +228,7 @@ class TestGrpcReplicationService {
};
ContainerImporter importer = mock(ContainerImporter.class);
GrpcReplicationService subject =
- new GrpcReplicationService(source, importer, false);
+ new GrpcReplicationService(source, importer);
CopyContainerRequestProto request = CopyContainerRequestProto.newBuilder()
.setContainerID(1)
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java
deleted file mode 100644
index 00891cf3e2..0000000000
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.junit.jupiter.api.BeforeEach;
-
-/**
- * Tests {@link GrpcReplicationService}.
- */
-class TestGrpcReplicationServiceWithZeroCopy
- extends TestGrpcReplicationService {
- @BeforeEach
- public void setUp() throws Exception {
- init(true);
- }
-}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
deleted file mode 100644
index 3063e2587e..0000000000
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.ozone.ClientConfigForTesting;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.BucketArgs;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.ozone.test.GenericTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/**
- * Tests key output stream.
- */
-abstract class AbstractTestECKeyOutputStream {
- private static MiniOzoneCluster cluster;
- private static OzoneConfiguration conf = new OzoneConfiguration();
- private static OzoneClient client;
- private static ObjectStore objectStore;
- private static int chunkSize;
- private static int flushSize;
- private static int maxFlushSize;
- private static int blockSize;
- private static String volumeName;
- private static String bucketName;
- private static String keyString;
- private static int dataBlocks = 3;
- private static int inputSize = dataBlocks * chunkSize;
- private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
-
- /**
- * Create a MiniDFSCluster for testing.
- */
- protected static void init(boolean zeroCopyEnabled) throws Exception {
- chunkSize = 1024 * 1024;
- flushSize = 2 * chunkSize;
- maxFlushSize = 2 * flushSize;
- blockSize = 2 * maxFlushSize;
-
- OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
- clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
- clientConfig.setStreamBufferFlushDelay(false);
- conf.setFromObject(clientConfig);
-
- // If SCM detects dead node too quickly, then container would be moved to
- // closed state and all in progress writes will get exception. To avoid
- // that, we are just keeping higher timeout and none of the tests depending
- // on deadnode detection timeout currently.
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
- conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
- conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
- TimeUnit.SECONDS);
- conf.setTimeDuration(
- "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
- TimeUnit.SECONDS);
- conf.setQuietMode(false);
- conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
- StorageUnit.MB);
- conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
- TimeUnit.SECONDS);
- conf.setBoolean(OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED,
- zeroCopyEnabled);
- conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
- // "Enable" hsync to verify that hsync would be blocked by
ECKeyOutputStream
- conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
- conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
- conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
-
- ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
- .setBlockSize(blockSize)
- .setChunkSize(chunkSize)
- .setStreamBufferFlushSize(flushSize)
- .setStreamBufferMaxSize(maxFlushSize)
- .applyTo(conf);
-
- cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(10)
- .build();
- cluster.waitForClusterToBeReady();
- client = OzoneClientFactory.getRpcClient(conf);
- objectStore = client.getObjectStore();
- keyString = UUID.randomUUID().toString();
- volumeName = "testeckeyoutputstream";
- bucketName = volumeName;
- objectStore.createVolume(volumeName);
- objectStore.getVolume(volumeName).createBucket(bucketName);
- initInputChunks();
- }
-
- @BeforeAll
- public static void init() throws Exception {
- init(false);
- }
-
- /**
- * Shutdown MiniDFSCluster.
- */
- @AfterAll
- public static void shutdown() {
- IOUtils.closeQuietly(client);
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Test
- public void testCreateKeyWithECReplicationConfig() throws Exception {
- try (OzoneOutputStream key = TestHelper
- .createKey(keyString, new ECReplicationConfig(3, 2,
- ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
- objectStore, volumeName, bucketName)) {
- assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
- }
- }
-
- @Test
- public void testCreateKeyWithOutBucketDefaults() throws Exception {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
- assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
- for (byte[] inputChunk : inputChunks) {
- out.write(inputChunk);
- }
- }
- }
-
- @Test
- public void testCreateKeyWithBucketDefaults() throws Exception {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- bucketArgs.setDefaultReplicationConfig(
- new DefaultReplicationConfig(
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize)));
-
- volume.createBucket(myBucket, bucketArgs.build());
- OzoneBucket bucket = volume.getBucket(myBucket);
-
- try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
- assertInstanceOf(ECKeyOutputStream.class, out.getOutputStream());
- for (byte[] inputChunk : inputChunks) {
- out.write(inputChunk);
- }
- }
- byte[] buf = new byte[chunkSize];
- try (OzoneInputStream in = bucket.readKey(keyString)) {
- for (byte[] inputChunk : inputChunks) {
- int read = in.read(buf, 0, chunkSize);
- assertEquals(chunkSize, read);
- assertArrayEquals(buf, inputChunk);
- }
- }
- }
-
- @Test
- public void testOverwriteECKeyWithRatisKey() throws Exception {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- volume.createBucket(myBucket, bucketArgs.build());
- OzoneBucket bucket = volume.getBucket(myBucket);
- createKeyAndCheckReplicationConfig(keyString, bucket,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize));
-
- //Overwrite with RATIS/THREE
- createKeyAndCheckReplicationConfig(keyString, bucket,
-
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-
- //Overwrite with RATIS/ONE
- createKeyAndCheckReplicationConfig(keyString, bucket,
- RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE));
- }
-
- @Test
- public void testOverwriteRatisKeyWithECKey() throws Exception {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- volume.createBucket(myBucket, bucketArgs.build());
- OzoneBucket bucket = volume.getBucket(myBucket);
-
- createKeyAndCheckReplicationConfig(keyString, bucket,
-
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
- // Overwrite with EC key
- createKeyAndCheckReplicationConfig(keyString, bucket,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize));
- }
-
- private void createKeyAndCheckReplicationConfig(String keyName,
- OzoneBucket bucket,
ReplicationConfig replicationConfig)
- throws IOException {
- try (OzoneOutputStream out = bucket
- .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) {
- for (byte[] inputChunk : inputChunks) {
- out.write(inputChunk);
- }
- }
- OzoneKeyDetails key = bucket.getKey(keyName);
- assertEquals(replicationConfig, key.getReplicationConfig());
- }
-
- @Test
- public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
- OzoneBucket bucket = getOzoneBucket();
- try (OzoneOutputStream out = bucket.createKey(
- "testCreateRatisKeyAndWithECBucketDefaults", 2000,
- RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
- new HashMap<>())) {
- assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
- for (byte[] inputChunk : inputChunks) {
- out.write(inputChunk);
- }
- }
- }
-
- @Test
- public void test13ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(13);
- }
-
- @Test
- public void testChunksInSingleWriteOpWithOffset() throws IOException {
- testMultipleChunksInSingleWriteOp(11, 25, 19);
- }
-
- @Test
- public void test15ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(15);
- }
-
- @Test
- public void test20ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(20);
- }
-
- @Test
- public void test21ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(21);
- }
-
- private void testMultipleChunksInSingleWriteOp(int offset,
- int bufferChunks, int
numChunks)
- throws IOException {
- byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
- final OzoneBucket bucket = getOzoneBucket();
- String keyName =
- String.format("testMultipleChunksInSingleWriteOpOffset" +
- "%dBufferChunks%dNumChunks", offset, bufferChunks,
- numChunks);
- try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize), new HashMap<>())) {
- out.write(inputData, offset, numChunks * chunkSize);
- }
-
- validateContent(offset, numChunks * chunkSize, inputData, bucket,
- bucket.getKey(keyName));
- }
-
- private void testMultipleChunksInSingleWriteOp(int numChunks)
- throws IOException {
- testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
- }
-
- @Test
- public void testECContainerKeysCountAndNumContainerReplicas()
- throws IOException, InterruptedException, TimeoutException {
- byte[] inputData = getInputBytes(1);
- final OzoneBucket bucket = getOzoneBucket();
- ContainerOperationClient containerOperationClient =
- new ContainerOperationClient(conf);
-
- ECReplicationConfig repConfig = new ECReplicationConfig(
- 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
- // Close all EC pipelines so we must get a fresh pipeline and hence
- // container for this test.
- PipelineManager pm =
- cluster.getStorageContainerManager().getPipelineManager();
- for (Pipeline p : pm.getPipelines(repConfig)) {
- pm.closePipeline(p, true);
- }
-
- String keyName = UUID.randomUUID().toString();
- try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
- repConfig, new HashMap<>())) {
- out.write(inputData);
- }
- OzoneKeyDetails key = bucket.getKey(keyName);
- long currentKeyContainerID =
- key.getOzoneKeyLocations().get(0).getContainerID();
-
- GenericTestUtils.waitFor(() -> {
- try {
- return (containerOperationClient.getContainer(currentKeyContainerID)
- .getNumberOfKeys() == 1) && (containerOperationClient
- .getContainerReplicas(currentKeyContainerID).size() == 5);
- } catch (IOException exception) {
- fail("Unexpected exception " + exception);
- return false;
- }
- }, 100, 10000);
- validateContent(inputData, bucket, key);
- }
-
- private void validateContent(byte[] inputData, OzoneBucket bucket,
- OzoneKey key) throws IOException {
- validateContent(0, inputData.length, inputData, bucket, key);
- }
-
- private void validateContent(int offset, int length, byte[] inputData,
- OzoneBucket bucket,
- OzoneKey key) throws IOException {
- try (OzoneInputStream is = bucket.readKey(key.getName())) {
- byte[] fileContent = new byte[length];
- assertEquals(length, is.read(fileContent));
- assertEquals(new String(Arrays.copyOfRange(inputData, offset,
- offset + length), UTF_8),
- new String(fileContent, UTF_8));
- }
- }
-
- private OzoneBucket getOzoneBucket() throws IOException {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- bucketArgs.setDefaultReplicationConfig(
- new DefaultReplicationConfig(
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize)));
-
- volume.createBucket(myBucket, bucketArgs.build());
- return volume.getBucket(myBucket);
- }
-
- private static void initInputChunks() {
- for (int i = 0; i < dataBlocks; i++) {
- inputChunks[i] = getBytesWith(i + 1, chunkSize);
- }
- }
-
- private static byte[] getBytesWith(int singleDigitNumber, int total) {
- StringBuilder builder = new StringBuilder(singleDigitNumber);
- for (int i = 1; i <= total; i++) {
- builder.append(singleDigitNumber);
- }
- return builder.toString().getBytes(UTF_8);
- }
-
- @Test
- public void testWriteShouldSucceedWhenDNKilled() throws Exception {
- int numChunks = 3;
- byte[] inputData = getInputBytes(numChunks);
- final OzoneBucket bucket = getOzoneBucket();
- String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
- DatanodeDetails nodeToKill = null;
- try {
- try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize), new HashMap<>())) {
- ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream();
- out.write(inputData);
- // Kill a node from first pipeline
- nodeToKill = ecOut.getStreamEntries()
- .get(0).getPipeline().getFirstNode();
- cluster.shutdownHddsDatanode(nodeToKill);
-
- out.write(inputData);
-
- // Wait for flushing thread to finish its work.
- final long checkpoint = System.currentTimeMillis();
- ecOut.insertFlushCheckpoint(checkpoint);
- GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() ==
checkpoint,
- 100, 10000);
-
- // Check the second blockGroup pipeline to make sure that the failed
- // node is not selected.
- assertThat(ecOut.getStreamEntries().get(1).getPipeline().getNodes())
- .doesNotContain(nodeToKill);
- }
-
- try (OzoneInputStream is = bucket.readKey(keyName)) {
- // We wrote "inputData" twice, so do two reads and ensure the correct
- // data comes back.
- for (int i = 0; i < 2; i++) {
- byte[] fileContent = new byte[inputData.length];
- assertEquals(inputData.length, is.read(fileContent));
- assertEquals(new String(inputData, UTF_8),
- new String(fileContent, UTF_8));
- }
- }
- } finally {
- cluster.restartHddsDatanode(nodeToKill, true);
- }
- }
-
- private byte[] getInputBytes(int numChunks) {
- return getInputBytes(0, numChunks, numChunks);
- }
-
- private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
- byte[] inputData = new byte[offset + bufferChunks * chunkSize];
- for (int i = 0; i < numChunks; i++) {
- int start = offset + (i * chunkSize);
- Arrays.fill(inputData, start, start + chunkSize - 1,
- String.valueOf(i % 9).getBytes(UTF_8)[0]);
- }
- return inputData;
- }
-
- @Test
- public void testBlockedHflushAndHsync() throws Exception {
- // Expect ECKeyOutputStream hflush and hsync calls to throw exception
- try (OzoneOutputStream oOut = TestHelper.createKey(
- keyString, new ECReplicationConfig(3, 2,
ECReplicationConfig.EcCodec.RS, chunkSize),
- inputSize, objectStore, volumeName, bucketName)) {
- assertInstanceOf(ECKeyOutputStream.class, oOut.getOutputStream());
- KeyOutputStream kOut = (KeyOutputStream) oOut.getOutputStream();
-
- assertThrows(NotImplementedException.class, () -> kOut.hflush());
- assertThrows(NotImplementedException.class, () -> kOut.hsync());
- }
- }
-
-}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index c5147ecfb0..5743866f2d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -17,15 +17,471 @@
package org.apache.hadoop.ozone.client.rpc;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
/**
- * Tests key output stream without zero-copy enabled.
+ * Tests key output stream.
*/
-public class TestECKeyOutputStream extends
- AbstractTestECKeyOutputStream {
+public class TestECKeyOutputStream {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+ private static int dataBlocks = 3;
+ private static int inputSize = dataBlocks * chunkSize;
+ private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ */
@BeforeAll
- public static void init() throws Exception {
- init(false);
+ protected static void init() throws Exception {
+ chunkSize = 1024 * 1024;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
+ // If SCM detects dead node too quickly, then container would be moved to
+ // closed state and all in progress writes will get exception. To avoid
+ // that, we are just keeping higher timeout and none of the tests depending
+ // on deadnode detection timeout currently.
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
+ conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
+ TimeUnit.SECONDS);
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+ // "Enable" hsync to verify that hsync would be blocked by
ECKeyOutputStream
+ conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
+ conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+ conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+
+ ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .applyTo(conf);
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(10)
+ .build();
+ cluster.waitForClusterToBeReady();
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testeckeyoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ initInputChunks();
}
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterAll
+ public static void shutdown() {
+ IOUtils.closeQuietly(client);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithECReplicationConfig() throws Exception {
+ try (OzoneOutputStream key = TestHelper
+ .createKey(keyString, new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
+ objectStore, volumeName, bucketName)) {
+ assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithOutBucketDefaults() throws Exception {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
+ assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
+ for (byte[] inputChunk : inputChunks) {
+ out.write(inputChunk);
+ }
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithBucketDefaults() throws Exception {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+
+ try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
+ assertInstanceOf(ECKeyOutputStream.class, out.getOutputStream());
+ for (byte[] inputChunk : inputChunks) {
+ out.write(inputChunk);
+ }
+ }
+ byte[] buf = new byte[chunkSize];
+ try (OzoneInputStream in = bucket.readKey(keyString)) {
+ for (byte[] inputChunk : inputChunks) {
+ int read = in.read(buf, 0, chunkSize);
+ assertEquals(chunkSize, read);
+ assertArrayEquals(buf, inputChunk);
+ }
+ }
+ }
+
+ @Test
+ public void testOverwriteECKeyWithRatisKey() throws Exception {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+ createKeyAndCheckReplicationConfig(keyString, bucket,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize));
+
+ //Overwrite with RATIS/THREE
+ createKeyAndCheckReplicationConfig(keyString, bucket,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+
+ //Overwrite with RATIS/ONE
+ createKeyAndCheckReplicationConfig(keyString, bucket,
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE));
+ }
+
+ @Test
+ public void testOverwriteRatisKeyWithECKey() throws Exception {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+
+ createKeyAndCheckReplicationConfig(keyString, bucket,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+ // Overwrite with EC key
+ createKeyAndCheckReplicationConfig(keyString, bucket,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize));
+ }
+
+ private void createKeyAndCheckReplicationConfig(String keyName,
+ OzoneBucket bucket,
ReplicationConfig replicationConfig)
+ throws IOException {
+ try (OzoneOutputStream out = bucket
+ .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) {
+ for (byte[] inputChunk : inputChunks) {
+ out.write(inputChunk);
+ }
+ }
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ assertEquals(replicationConfig, key.getReplicationConfig());
+ }
+
+ @Test
+ public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
+ OzoneBucket bucket = getOzoneBucket();
+ try (OzoneOutputStream out = bucket.createKey(
+ "testCreateRatisKeyAndWithECBucketDefaults", 2000,
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+ new HashMap<>())) {
+ assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
+ for (byte[] inputChunk : inputChunks) {
+ out.write(inputChunk);
+ }
+ }
+ }
+
+ @Test
+ public void test13ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(13);
+ }
+
+ @Test
+ public void testChunksInSingleWriteOpWithOffset() throws IOException {
+ testMultipleChunksInSingleWriteOp(11, 25, 19);
+ }
+
+ @Test
+ public void test15ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(15);
+ }
+
+ @Test
+ public void test20ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(20);
+ }
+
+ @Test
+ public void test21ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(21);
+ }
+
+ private void testMultipleChunksInSingleWriteOp(int offset,
+ int bufferChunks, int
numChunks)
+ throws IOException {
+ byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName =
+ String.format("testMultipleChunksInSingleWriteOpOffset" +
+ "%dBufferChunks%dNumChunks", offset, bufferChunks,
+ numChunks);
+ try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ out.write(inputData, offset, numChunks * chunkSize);
+ }
+
+ validateContent(offset, numChunks * chunkSize, inputData, bucket,
+ bucket.getKey(keyName));
+ }
+
+ private void testMultipleChunksInSingleWriteOp(int numChunks)
+ throws IOException {
+ testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
+ }
+
+ @Test
+ public void testECContainerKeysCountAndNumContainerReplicas()
+ throws IOException, InterruptedException, TimeoutException {
+ byte[] inputData = getInputBytes(1);
+ final OzoneBucket bucket = getOzoneBucket();
+ ContainerOperationClient containerOperationClient =
+ new ContainerOperationClient(conf);
+
+ ECReplicationConfig repConfig = new ECReplicationConfig(
+ 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
+ // Close all EC pipelines so we must get a fresh pipeline and hence
+ // container for this test.
+ PipelineManager pm =
+ cluster.getStorageContainerManager().getPipelineManager();
+ for (Pipeline p : pm.getPipelines(repConfig)) {
+ pm.closePipeline(p, true);
+ }
+
+ String keyName = UUID.randomUUID().toString();
+ try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+ repConfig, new HashMap<>())) {
+ out.write(inputData);
+ }
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ long currentKeyContainerID =
+ key.getOzoneKeyLocations().get(0).getContainerID();
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return (containerOperationClient.getContainer(currentKeyContainerID)
+ .getNumberOfKeys() == 1) && (containerOperationClient
+ .getContainerReplicas(currentKeyContainerID).size() == 5);
+ } catch (IOException exception) {
+ fail("Unexpected exception " + exception);
+ return false;
+ }
+ }, 100, 10000);
+ validateContent(inputData, bucket, key);
+ }
+
+ private void validateContent(byte[] inputData, OzoneBucket bucket,
+ OzoneKey key) throws IOException {
+ validateContent(0, inputData.length, inputData, bucket, key);
+ }
+
+ private void validateContent(int offset, int length, byte[] inputData,
+ OzoneBucket bucket,
+ OzoneKey key) throws IOException {
+ try (OzoneInputStream is = bucket.readKey(key.getName())) {
+ byte[] fileContent = new byte[length];
+ assertEquals(length, is.read(fileContent));
+ assertEquals(new String(Arrays.copyOfRange(inputData, offset,
+ offset + length), UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
+
+ private OzoneBucket getOzoneBucket() throws IOException {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ return volume.getBucket(myBucket);
+ }
+
+ private static void initInputChunks() {
+ for (int i = 0; i < dataBlocks; i++) {
+ inputChunks[i] = getBytesWith(i + 1, chunkSize);
+ }
+ }
+
+ private static byte[] getBytesWith(int singleDigitNumber, int total) {
+ StringBuilder builder = new StringBuilder(singleDigitNumber);
+ for (int i = 1; i <= total; i++) {
+ builder.append(singleDigitNumber);
+ }
+ return builder.toString().getBytes(UTF_8);
+ }
+
+ @Test
+ public void testWriteShouldSucceedWhenDNKilled() throws Exception {
+ int numChunks = 3;
+ byte[] inputData = getInputBytes(numChunks);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
+ DatanodeDetails nodeToKill = null;
+ try {
+ try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream();
+ out.write(inputData);
+ // Kill a node from first pipeline
+ nodeToKill = ecOut.getStreamEntries()
+ .get(0).getPipeline().getFirstNode();
+ cluster.shutdownHddsDatanode(nodeToKill);
+
+ out.write(inputData);
+
+ // Wait for flushing thread to finish its work.
+ final long checkpoint = System.currentTimeMillis();
+ ecOut.insertFlushCheckpoint(checkpoint);
+ GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() ==
checkpoint,
+ 100, 10000);
+
+ // Check the second blockGroup pipeline to make sure that the failed
+ // node is not selected.
+ assertThat(ecOut.getStreamEntries().get(1).getPipeline().getNodes())
+ .doesNotContain(nodeToKill);
+ }
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ // We wrote "inputData" twice, so do two reads and ensure the correct
+ // data comes back.
+ for (int i = 0; i < 2; i++) {
+ byte[] fileContent = new byte[inputData.length];
+ assertEquals(inputData.length, is.read(fileContent));
+ assertEquals(new String(inputData, UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
+ } finally {
+ cluster.restartHddsDatanode(nodeToKill, true);
+ }
+ }
+
+ private byte[] getInputBytes(int numChunks) {
+ return getInputBytes(0, numChunks, numChunks);
+ }
+
+ private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
+ byte[] inputData = new byte[offset + bufferChunks * chunkSize];
+ for (int i = 0; i < numChunks; i++) {
+ int start = offset + (i * chunkSize);
+ Arrays.fill(inputData, start, start + chunkSize - 1,
+ String.valueOf(i % 9).getBytes(UTF_8)[0]);
+ }
+ return inputData;
+ }
+
+ @Test
+ public void testBlockedHflushAndHsync() throws Exception {
+ // Expect ECKeyOutputStream hflush and hsync calls to throw exception
+ try (OzoneOutputStream oOut = TestHelper.createKey(
+ keyString, new ECReplicationConfig(3, 2,
ECReplicationConfig.EcCodec.RS, chunkSize),
+ inputSize, objectStore, volumeName, bucketName)) {
+ assertInstanceOf(ECKeyOutputStream.class, oOut.getOutputStream());
+ KeyOutputStream kOut = (KeyOutputStream) oOut.getOutputStream();
+
+ assertThrows(NotImplementedException.class, () -> kOut.hflush());
+ assertThrows(NotImplementedException.class, () -> kOut.hsync());
+ }
+ }
+
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
deleted file mode 100644
index 47c94e03cb..0000000000
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.junit.jupiter.api.BeforeAll;
-
-/**
- * Tests key output stream with zero-copy enabled.
- */
-public class TestECKeyOutputStreamWithZeroCopy extends
- AbstractTestECKeyOutputStream {
- @BeforeAll
- public static void init() throws Exception {
- init(true);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]