This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 385c4ec6ca HDDS-10934. Refactor TestOzoneRpcClient hierarchy (#6747)
385c4ec6ca is described below
commit 385c4ec6ca2f20b43477050f918b7a644f7569f5
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri May 31 22:07:29 2024 +0200
HDDS-10934. Refactor TestOzoneRpcClient hierarchy (#6747)
---
.../dev-support/findbugsExcludeFile.xml | 12 -
...lientAbstract.java => OzoneRpcClientTests.java} | 387 +++++++++++++++++----
.../client/rpc/TestOzoneAtRestEncryption.java | 8 -
.../ozone/client/rpc/TestOzoneRpcClient.java | 16 +-
.../client/rpc/TestOzoneRpcClientWithRatis.java | 347 +-----------------
.../ozone/client/rpc/TestSecureOzoneRpcClient.java | 125 ++-----
6 files changed, 354 insertions(+), 541 deletions(-)
diff --git a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
index 92ceb203b1..632e9fc2f4 100644
--- a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
@@ -60,18 +60,6 @@
<Class name="org.apache.hadoop.ozone.client.rpc.TestKeyInputStream"/>
<Bug pattern="SR_NOT_CHECKED" />
</Match>
- <Match>
- <Class
name="org.apache.hadoop.ozone.client.rpc.TestOzoneRpcClientAbstract"/>
- <Bug pattern="DLS_DEAD_LOCAL_STORE" />
- </Match>
- <Match>
- <Class
name="org.apache.hadoop.ozone.client.rpc.TestOzoneRpcClientAbstract"/>
- <Bug pattern="NP_NULL_ON_SOME_PATH" />
- </Match>
- <Match>
- <Class
name="org.apache.hadoop.ozone.client.rpc.TestOzoneRpcClientAbstract"/>
- <Bug pattern="RV_RETURN_VALUE_IGNORED" />
- </Match>
<Match>
<Class name="org.apache.hadoop.hdds.scm.storage.TestCommitWatcher"/>
<Bug pattern="URF_UNREAD_FIELD" />
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
similarity index 93%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
index 8e22335759..f3c9227126 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
@@ -17,8 +17,13 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
@@ -30,12 +35,15 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@@ -65,12 +73,14 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -86,6 +96,7 @@ import
org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneSnapshot;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -119,6 +130,7 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -159,6 +171,7 @@ import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.REA
import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE;
import static org.apache.ozone.test.GenericTestUtils.getTestStartTime;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -181,13 +194,10 @@ import org.junit.jupiter.params.provider.MethodSource;
/**
* This is an abstract class to test all the public facing APIs of Ozone
- * Client, w/o OM Ratis server.
- * {@link TestOzoneRpcClient} tests the Ozone Client by submitting the
- * requests directly to OzoneManager. {@link TestOzoneRpcClientWithRatis}
- * tests the Ozone Client by submitting requests to OM's Ratis server.
+ * Client.
*/
@TestMethodOrder(MethodOrderer.MethodName.class)
-public abstract class TestOzoneRpcClientAbstract {
+abstract class OzoneRpcClientTests {
private static MiniOzoneCluster cluster = null;
private static OzoneClient ozClient = null;
@@ -215,9 +225,12 @@ public abstract class TestOzoneRpcClientAbstract {
/**
* Create a MiniOzoneCluster for testing.
* @param conf Configurations to start the cluster.
- * @throws Exception
*/
static void startCluster(OzoneConfiguration conf) throws Exception {
+ startCluster(conf, MiniOzoneCluster.newBuilder(conf));
+ }
+
+ static void startCluster(OzoneConfiguration conf, MiniOzoneCluster.Builder
builder) throws Exception {
// Reduce long wait time in MiniOzoneClusterImpl#waitForHddsDatanodesStop
// for testZReadKeyWithUnhealthyContainerReplica.
conf.set("ozone.scm.stale.node.interval", "10s");
@@ -227,7 +240,7 @@ public abstract class TestOzoneRpcClientAbstract {
.setDataStreamMinPacketSize(1)
.applyTo(conf);
- cluster = MiniOzoneCluster.newBuilder(conf)
+ cluster = builder
.setNumDatanodes(14)
.build();
cluster.waitForClusterToBeReady();
@@ -255,39 +268,24 @@ public abstract class TestOzoneRpcClientAbstract {
}
}
- public static void setCluster(MiniOzoneCluster cluster) {
- TestOzoneRpcClientAbstract.cluster = cluster;
- }
-
- public static void setOzClient(OzoneClient ozClient) {
- TestOzoneRpcClientAbstract.ozClient = ozClient;
+ private static void setOzClient(OzoneClient ozClient) {
+ OzoneRpcClientTests.ozClient = ozClient;
}
- public static void setOzoneManager(OzoneManager ozoneManager) {
- TestOzoneRpcClientAbstract.ozoneManager = ozoneManager;
- }
-
- public static void setStorageContainerLocationClient(
- StorageContainerLocationProtocolClientSideTranslatorPB
- storageContainerLocationClient) {
- TestOzoneRpcClientAbstract.storageContainerLocationClient =
- storageContainerLocationClient;
- }
-
- public static void setStore(ObjectStore store) {
- TestOzoneRpcClientAbstract.store = store;
+ private static void setStore(ObjectStore store) {
+ OzoneRpcClientTests.store = store;
}
public static ObjectStore getStore() {
- return TestOzoneRpcClientAbstract.store;
+ return store;
}
public static OzoneClient getClient() {
- return TestOzoneRpcClientAbstract.ozClient;
+ return ozClient;
}
public static MiniOzoneCluster getCluster() {
- return TestOzoneRpcClientAbstract.cluster;
+ return cluster;
}
/**
* Test OM Proxy Provider.
@@ -1007,7 +1005,7 @@ public abstract class TestOzoneRpcClientAbstract {
store.deleteVolume(volumeName);
}
- private void verifyReplication(String volumeName, String bucketName,
+ protected void verifyReplication(String volumeName, String bucketName,
String keyName, ReplicationConfig replication)
throws IOException {
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
@@ -1280,8 +1278,6 @@ public abstract class TestOzoneRpcClientAbstract {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
- long blockSize = (long) ozoneManager.getConfiguration().getStorageSize(
- OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
@@ -1733,7 +1729,7 @@ public abstract class TestOzoneRpcClientAbstract {
thread1.start();
thread2.start();
- latch.await(600, TimeUnit.SECONDS);
+ assertTrue(latch.await(600, TimeUnit.SECONDS));
assertThat(failCount.get())
.withFailMessage("testPutKeyRatisThreeNodesParallel failed")
@@ -1885,6 +1881,7 @@ public abstract class TestOzoneRpcClientAbstract {
break;
}
}
+ assertNotNull(datanodeService);
KeyValueContainerData containerData =
(KeyValueContainerData)(datanodeService.getDatanodeStateMachine()
.getContainer().getContainerSet().getContainer(containerID)
@@ -2179,35 +2176,24 @@ public abstract class TestOzoneRpcClientAbstract {
OzoneBucket bucket = volume.getBucket(bucketName);
createTestKey(bucket, fromKeyName, value);
BucketLayout bucketLayout = bucket.getBucketLayout();
- OMException oe = null;
- String toKeyName = "";
if (!bucketLayout.isFileSystemOptimized()) {
// Rename to an empty string should fail only in non FSO buckets
- try {
- bucket.renameKey(fromKeyName, toKeyName);
- } catch (OMException e) {
- oe = e;
- }
+ OMException oe = assertThrows(OMException.class, () ->
bucket.renameKey(fromKeyName, ""));
assertEquals(ResultCodes.INVALID_KEY_NAME, oe.getResult());
} else {
// Rename to an empty key in FSO should be okay, as we are handling the
// empty dest key on the server side and the source key name will be used
- bucket.renameKey(fromKeyName, toKeyName);
+ bucket.renameKey(fromKeyName, "");
OzoneKey emptyRenameKey = bucket.getKey(fromKeyName);
assertEquals(fromKeyName, emptyRenameKey.getName());
}
- toKeyName = UUID.randomUUID().toString();
+ String toKeyName = UUID.randomUUID().toString();
bucket.renameKey(fromKeyName, toKeyName);
// Lookup for old key should fail.
- try {
- bucket.getKey(fromKeyName);
- } catch (OMException e) {
- oe = e;
- }
- assertEquals(KEY_NOT_FOUND, oe.getResult());
+ assertKeyRenamedEx(bucket, fromKeyName);
OzoneKey key = bucket.getKey(toKeyName);
assertEquals(toKeyName, key.getName());
@@ -3290,11 +3276,7 @@ public abstract class TestOzoneRpcClientAbstract {
Map<Integer, String> partsMap = new LinkedHashMap<>();
partsMap.put(1, omMultipartCommitUploadPartInfo.getETag());
- OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
- bucket.completeMultipartUpload(keyName,
- uploadID, partsMap);
-
- assertNotNull(omMultipartCommitUploadPartInfo);
+ completeMultipartUpload(bucket, keyName, uploadID, partsMap);
byte[] fileContent = new byte[data.length];
try (OzoneInputStream inputStream = bucket.readKey(keyName)) {
@@ -3985,14 +3967,8 @@ public abstract class TestOzoneRpcClientAbstract {
assertEquals(keyName, key.getName());
}
- private void assertKeyRenamedEx(OzoneBucket bucket, String keyName)
- throws Exception {
- OMException oe = null;
- try {
- bucket.getKey(keyName);
- } catch (OMException e) {
- oe = e;
- }
+ private void assertKeyRenamedEx(OzoneBucket bucket, String keyName) {
+ OMException oe = assertThrows(OMException.class, () ->
bucket.getKey(keyName));
assertEquals(KEY_NOT_FOUND, oe.getResult());
}
@@ -4262,8 +4238,7 @@ public abstract class TestOzoneRpcClientAbstract {
omKeyInfo.getKeyLocationVersions().size());
// ensure flush double buffer for deleted Table
- cluster.getOzoneManager().getOmRatisServer().getOmStateMachine()
- .awaitDoubleBufferFlush();
+ cluster.getOzoneManager().awaitDoubleBufferFlush();
if (expectedCount == 1) {
List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
@@ -4452,4 +4427,290 @@ public abstract class TestOzoneRpcClientAbstract {
assertFalse(snapshotIter.hasNext());
}
+
+ /**
+ * Tests get the information of key with network topology awareness enabled.
+ */
+ @Test
+ void testGetKeyAndFileWithNetworkTopology() throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ String value = "sample value";
+ getStore().createVolume(volumeName);
+ OzoneVolume volume = getStore().getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ String keyName = UUID.randomUUID().toString();
+
+ // Write data into a key
+ try (OzoneOutputStream out = bucket.createKey(keyName,
+ value.getBytes(UTF_8).length, ReplicationConfig.fromTypeAndFactor(
+ ReplicationType.RATIS, THREE), new HashMap<>())) {
+ out.write(value.getBytes(UTF_8));
+ }
+
+ // Since the rpc client is outside of cluster, then getFirstNode should be
+ // equal to getClosestNode.
+ OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
+ builder.setVolumeName(volumeName).setBucketName(bucketName)
+ .setKeyName(keyName);
+
+ // read key with topology aware read enabled
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] b = new byte[value.getBytes(UTF_8).length];
+ is.read(b);
+ assertArrayEquals(b, value.getBytes(UTF_8));
+ }
+
+ // read file with topology aware read enabled
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] b = new byte[value.getBytes(UTF_8).length];
+ is.read(b);
+ assertArrayEquals(b, value.getBytes(UTF_8));
+ }
+
+ // read key with topology aware read disabled
+ OzoneConfiguration conf = getCluster().getConf();
+ conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
+ false);
+ try (OzoneClient newClient = OzoneClientFactory.getRpcClient(conf)) {
+ ObjectStore newStore = newClient.getObjectStore();
+ OzoneBucket newBucket =
+ newStore.getVolume(volumeName).getBucket(bucketName);
+ try (OzoneInputStream is = newBucket.readKey(keyName)) {
+ byte[] b = new byte[value.getBytes(UTF_8).length];
+ is.read(b);
+ assertArrayEquals(b, value.getBytes(UTF_8));
+ }
+
+ // read file with topology aware read disabled
+ try (OzoneInputStream is = newBucket.readFile(keyName)) {
+ byte[] b = new byte[value.getBytes(UTF_8).length];
+ is.read(b);
+ assertArrayEquals(b, value.getBytes(UTF_8));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("replicationConfigs")
+ void testMultiPartUploadWithStream(ReplicationConfig replicationConfig)
+ throws IOException, NoSuchAlgorithmException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName =
replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) +
"-bucket";
+ String keyName = replicationConfig.getReplication();
+
+ byte[] sampleData = new byte[1024 * 8];
+
+ int valueLength = sampleData.length;
+
+ getStore().createVolume(volumeName);
+ OzoneVolume volume = getStore().getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+ replicationConfig);
+
+ assertNotNull(multipartInfo);
+ String uploadID = multipartInfo.getUploadID();
+ assertNotNull(multipartInfo.getUploadID());
+
+ OzoneDataStreamOutput ozoneStreamOutput = bucket.createMultipartStreamKey(
+ keyName, valueLength, 1, uploadID);
+ ozoneStreamOutput.write(ByteBuffer.wrap(sampleData), 0,
+ valueLength);
+ ozoneStreamOutput.getMetadata().put(OzoneConsts.ETAG,
+
DatatypeConverter.printHexBinary(MessageDigest.getInstance(OzoneConsts.MD5_HASH)
+ .digest(sampleData)).toLowerCase());
+ ozoneStreamOutput.close();
+
+ OzoneMultipartUploadPartListParts parts =
+ bucket.listParts(keyName, uploadID, 0, 1);
+
+ assertEquals(1, parts.getPartInfoList().size());
+
+ OzoneMultipartUploadPartListParts.PartInfo partInfo =
+ parts.getPartInfoList().get(0);
+ assertEquals(valueLength, partInfo.getSize());
+
+ }
+
+ @Test
+ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException {
+ // create a local dir
+ final String dir = GenericTestUtils.getTempPath(
+ getClass().getSimpleName());
+ GenericTestUtils.assertDirCreation(new File(dir));
+
+ // create a local file
+ final int chunkSize = 1024;
+ final byte[] data = new byte[8 * chunkSize];
+ ThreadLocalRandom.current().nextBytes(data);
+ final File file = new File(dir, "data");
+ try (FileOutputStream out = new FileOutputStream(file)) {
+ out.write(data);
+ }
+
+ // create a volume
+ final String volumeName = "vol-" + UUID.randomUUID();
+ getStore().createVolume(volumeName);
+ final OzoneVolume volume = getStore().getVolume(volumeName);
+
+ // create a bucket
+ final String bucketName = "buck-" + UUID.randomUUID();
+ final BucketArgs bucketArgs = BucketArgs.newBuilder()
+ .setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationConfig.fromTypeAndFactor(
+ ReplicationType.RATIS, THREE)))
+ .build();
+ volume.createBucket(bucketName, bucketArgs);
+ final OzoneBucket bucket = volume.getBucket(bucketName);
+
+ // upload a key from the local file using memory-mapped buffers
+ final String keyName = "key-" + UUID.randomUUID();
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r");
+ OzoneDataStreamOutput out = bucket.createStreamKey(
+ keyName, data.length)) {
+ final FileChannel channel = raf.getChannel();
+ long off = 0;
+ for (long len = raf.length(); len > 0;) {
+ final long writeLen = Math.min(len, chunkSize);
+ final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY,
+ off, writeLen);
+ out.write(mapped);
+ off += writeLen;
+ len -= writeLen;
+ }
+ }
+
+ // verify the key details
+ final OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+ assertEquals(keyName, keyDetails.getName());
+ assertEquals(data.length, keyDetails.getDataSize());
+
+ // verify the key content
+ final byte[] buffer = new byte[data.length];
+ try (OzoneInputStream in = keyDetails.getContent()) {
+ for (int off = 0; off < data.length;) {
+ final int n = in.read(buffer, off, data.length - off);
+ if (n < 0) {
+ break;
+ }
+ off += n;
+ }
+ }
+ assertArrayEquals(data, buffer);
+ }
+
+ @Test
+ public void testParallelDeleteBucketAndCreateKey() throws IOException,
+ InterruptedException, TimeoutException {
+ assumeThat(getCluster().getOzoneManager().isRatisEnabled()).isTrue();
+
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ String value = "sample value";
+ getStore().createVolume(volumeName);
+ OzoneVolume volume = getStore().getVolume(volumeName);
+ volume.createBucket(bucketName);
+ String keyName = UUID.randomUUID().toString();
+
+ GenericTestUtils.LogCapturer omSMLog = GenericTestUtils.LogCapturer
+ .captureLogs(OzoneManagerStateMachine.LOG);
+ OzoneManagerStateMachine omSM = getCluster().getOzoneManager()
+ .getOmRatisServer().getOmStateMachine();
+
+ Thread thread1 = new Thread(() -> {
+ try {
+ volume.deleteBucket(bucketName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Thread thread2 = new Thread(() -> {
+ try {
+ getClient().getProxy().createKey(volumeName, bucketName, keyName,
+ 0, ReplicationType.RATIS, ONE, new HashMap<>());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ OMRequestHandlerPauseInjector injector =
+ new OMRequestHandlerPauseInjector();
+ omSM.getHandler().setInjector(injector);
+ thread1.start();
+ thread2.start();
+ // Wait long enough for createKey's preExecute to finish executing
+ GenericTestUtils.waitFor(() -> {
+ return
getCluster().getOzoneManager().getOmServerProtocol().getLastRequestToSubmit().getCmdType().equals(
+ OzoneManagerProtocolProtos.Type.CreateKey);
+ }, 100, 10000);
+ injector.resume();
+
+ try {
+ thread1.join();
+ thread2.join();
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ omSM.getHandler().setInjector(null);
+ // Generate more write requests to OM
+ String newBucketName = UUID.randomUUID().toString();
+ volume.createBucket(newBucketName);
+ OzoneBucket bucket = volume.getBucket(newBucketName);
+ for (int i = 0; i < 10; i++) {
+ bucket.createKey("key-" + i, value.getBytes(UTF_8).length,
+ ReplicationType.RATIS, ONE, new HashMap<>());
+ }
+
+ assertThat(omSMLog.getOutput()).contains("Failed to write, Exception
occurred");
+ }
+
+ private static class OMRequestHandlerPauseInjector extends FaultInjector {
+ private CountDownLatch ready;
+ private CountDownLatch wait;
+
+ OMRequestHandlerPauseInjector() {
+ init();
+ }
+
+ @Override
+ public void init() {
+ this.ready = new CountDownLatch(1);
+ this.wait = new CountDownLatch(1);
+ }
+
+ @Override
+ public void pause() throws IOException {
+ ready.countDown();
+ try {
+ wait.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void resume() {
+ // Make sure injector pauses before resuming.
+ try {
+ ready.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail("resume interrupted");
+ }
+ wait.countDown();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 5288bcb3cf..b3d38fe8bc 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -177,12 +177,6 @@ class TestOzoneAtRestEncryption {
cluster.getStorageContainerLocationClient();
ozoneManager = cluster.getOzoneManager();
ozoneManager.setMinMultipartUploadPartSize(MPU_PART_MIN_SIZE);
- TestOzoneRpcClient.setCluster(cluster);
- TestOzoneRpcClient.setOzClient(ozClient);
- TestOzoneRpcClient.setOzoneManager(ozoneManager);
- TestOzoneRpcClient.setStorageContainerLocationClient(
- storageContainerLocationClient);
- TestOzoneRpcClient.setStore(store);
// create test key
createKey(TEST_KEY, cluster.getOzoneManager().getKmsProvider(), conf);
@@ -216,8 +210,6 @@ class TestOzoneAtRestEncryption {
static void reInitClient() throws IOException {
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
- TestOzoneRpcClient.setOzClient(ozClient);
- TestOzoneRpcClient.setStore(store);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 527b3bb121..0c5db29fd5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -23,27 +23,22 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
/**
- * This class is to test all the public facing APIs of Ozone Client.
+ * Test Ozone Client with OM Ratis disabled.
*/
@Timeout(300)
-public class TestOzoneRpcClient extends TestOzoneRpcClientAbstract {
+class TestOzoneRpcClient extends OzoneRpcClientTests {
- /**
- * Create a MiniOzoneCluster for testing.
- * <p>
- * Ozone is made active by setting OZONE_ENABLED = true
- *
- * @throws IOException
- */
@BeforeAll
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, false);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(OzoneConfigKeys.OZONE_ACL_ENABLED, true);
conf.set(OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS,
@@ -51,9 +46,6 @@ public class TestOzoneRpcClient extends
TestOzoneRpcClientAbstract {
startCluster(conf);
}
- /**
- * Close OzoneClient and shutdown MiniOzoneCluster.
- */
@AfterAll
public static void shutdown() throws IOException {
shutdownCluster();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index c4a452e168..95d7ba6218 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -18,77 +18,23 @@
package org.apache.hadoop.ozone.client.rpc;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
-import javax.xml.bind.DatatypeConverter;
-import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.client.BucketArgs;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
-import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
-import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
-import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
/**
- * This class is to test all the public facing APIs of Ozone Client with an
- * active OM Ratis server.
+ * Test Ozone Client with OM Ratis enabled.
*/
-public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
- private static OzoneConfiguration conf;
- /**
- * Create a MiniOzoneCluster for testing.
- * Ozone is made active by setting OZONE_ENABLED = true.
- * Ozone OM Ratis server is made active by setting
- * OZONE_OM_RATIS_ENABLE = true;
- */
+class TestOzoneRpcClientWithRatis extends OzoneRpcClientTests {
+
@BeforeAll
public static void init() throws Exception {
- conf = new OzoneConfiguration();
+ OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
false);
@@ -101,294 +47,9 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
startCluster(conf);
}
- /**
- * Close OzoneClient and shutdown MiniOzoneCluster.
- */
@AfterAll
public static void shutdown() throws IOException {
shutdownCluster();
}
- /**
- * Tests get the information of key with network topology awareness enabled.
- */
- @Test
- void testGetKeyAndFileWithNetworkTopology() throws IOException {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
-
- String value = "sample value";
- getStore().createVolume(volumeName);
- OzoneVolume volume = getStore().getVolume(volumeName);
- volume.createBucket(bucketName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- String keyName = UUID.randomUUID().toString();
-
- // Write data into a key
- try (OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, ReplicationConfig.fromTypeAndFactor(
- ReplicationType.RATIS, THREE), new HashMap<>())) {
- out.write(value.getBytes(UTF_8));
- }
-
- // Since the rpc client is outside of cluster, then getFirstNode should be
- // equal to getClosestNode.
- OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
- builder.setVolumeName(volumeName).setBucketName(bucketName)
- .setKeyName(keyName);
-
- // read key with topology aware read enabled
- try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] b = new byte[value.getBytes(UTF_8).length];
- is.read(b);
- assertArrayEquals(b, value.getBytes(UTF_8));
- }
-
- // read file with topology aware read enabled
- try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] b = new byte[value.getBytes(UTF_8).length];
- is.read(b);
- assertArrayEquals(b, value.getBytes(UTF_8));
- }
-
- // read key with topology aware read disabled
- conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
- false);
- try (OzoneClient newClient = OzoneClientFactory.getRpcClient(conf)) {
- ObjectStore newStore = newClient.getObjectStore();
- OzoneBucket newBucket =
- newStore.getVolume(volumeName).getBucket(bucketName);
- try (OzoneInputStream is = newBucket.readKey(keyName)) {
- byte[] b = new byte[value.getBytes(UTF_8).length];
- is.read(b);
- assertArrayEquals(b, value.getBytes(UTF_8));
- }
-
- // read file with topology aware read disabled
- try (OzoneInputStream is = newBucket.readFile(keyName)) {
- byte[] b = new byte[value.getBytes(UTF_8).length];
- is.read(b);
- assertArrayEquals(b, value.getBytes(UTF_8));
- }
- }
- }
-
- @ParameterizedTest
- @MethodSource("replicationConfigs")
- void testMultiPartUploadWithStream(ReplicationConfig replicationConfig)
- throws IOException, NoSuchAlgorithmException {
- String volumeName = UUID.randomUUID().toString();
- String bucketName =
replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) +
"-bucket";
- String keyName = replicationConfig.getReplication();
-
- byte[] sampleData = new byte[1024 * 8];
-
- int valueLength = sampleData.length;
-
- getStore().createVolume(volumeName);
- OzoneVolume volume = getStore().getVolume(volumeName);
- volume.createBucket(bucketName);
- OzoneBucket bucket = volume.getBucket(bucketName);
-
- OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
- replicationConfig);
-
- assertNotNull(multipartInfo);
- String uploadID = multipartInfo.getUploadID();
- assertNotNull(multipartInfo.getUploadID());
-
- OzoneDataStreamOutput ozoneStreamOutput = bucket.createMultipartStreamKey(
- keyName, valueLength, 1, uploadID);
- ozoneStreamOutput.write(ByteBuffer.wrap(sampleData), 0,
- valueLength);
- ozoneStreamOutput.getMetadata().put(OzoneConsts.ETAG,
-
DatatypeConverter.printHexBinary(MessageDigest.getInstance(OzoneConsts.MD5_HASH)
- .digest(sampleData)).toLowerCase());
- ozoneStreamOutput.close();
-
- OzoneMultipartUploadPartListParts parts =
- bucket.listParts(keyName, uploadID, 0, 1);
-
- assertEquals(1, parts.getPartInfoList().size());
-
- OzoneMultipartUploadPartListParts.PartInfo partInfo =
- parts.getPartInfoList().get(0);
- assertEquals(valueLength, partInfo.getSize());
-
- }
-
- @Test
- public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException {
- // create a local dir
- final String dir = GenericTestUtils.getTempPath(
- getClass().getSimpleName());
- GenericTestUtils.assertDirCreation(new File(dir));
-
- // create a local file
- final int chunkSize = 1024;
- final byte[] data = new byte[8 * chunkSize];
- ThreadLocalRandom.current().nextBytes(data);
- final File file = new File(dir, "data");
- try (FileOutputStream out = new FileOutputStream(file)) {
- out.write(data);
- }
-
- // create a volume
- final String volumeName = "vol-" + UUID.randomUUID();
- getStore().createVolume(volumeName);
- final OzoneVolume volume = getStore().getVolume(volumeName);
-
- // create a bucket
- final String bucketName = "buck-" + UUID.randomUUID();
- final BucketArgs bucketArgs = BucketArgs.newBuilder()
- .setDefaultReplicationConfig(
- new DefaultReplicationConfig(ReplicationConfig.fromTypeAndFactor(
- ReplicationType.RATIS, THREE)))
- .build();
- volume.createBucket(bucketName, bucketArgs);
- final OzoneBucket bucket = volume.getBucket(bucketName);
-
- // upload a key from the local file using memory-mapped buffers
- final String keyName = "key-" + UUID.randomUUID();
- try (RandomAccessFile raf = new RandomAccessFile(file, "r");
- OzoneDataStreamOutput out = bucket.createStreamKey(
- keyName, data.length)) {
- final FileChannel channel = raf.getChannel();
- long off = 0;
- for (long len = raf.length(); len > 0;) {
- final long writeLen = Math.min(len, chunkSize);
- final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY,
- off, writeLen);
- out.write(mapped);
- off += writeLen;
- len -= writeLen;
- }
- }
-
- // verify the key details
- final OzoneKeyDetails keyDetails = bucket.getKey(keyName);
- assertEquals(keyName, keyDetails.getName());
- assertEquals(data.length, keyDetails.getDataSize());
-
- // verify the key content
- final byte[] buffer = new byte[data.length];
- try (OzoneInputStream in = keyDetails.getContent()) {
- for (int off = 0; off < data.length;) {
- final int n = in.read(buffer, off, data.length - off);
- if (n < 0) {
- break;
- }
- off += n;
- }
- }
- assertArrayEquals(data, buffer);
- }
-
- @Test
- public void testParallelDeleteBucketAndCreateKey() throws IOException,
- InterruptedException, TimeoutException {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
-
- String value = "sample value";
- getStore().createVolume(volumeName);
- OzoneVolume volume = getStore().getVolume(volumeName);
- volume.createBucket(bucketName);
- String keyName = UUID.randomUUID().toString();
-
- GenericTestUtils.LogCapturer omSMLog = GenericTestUtils.LogCapturer
- .captureLogs(OzoneManagerStateMachine.LOG);
- OzoneManagerStateMachine omSM = getCluster().getOzoneManager()
- .getOmRatisServer().getOmStateMachine();
-
- Thread thread1 = new Thread(() -> {
- try {
- volume.deleteBucket(bucketName);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
-
- Thread thread2 = new Thread(() -> {
- try {
- getClient().getProxy().createKey(volumeName, bucketName, keyName,
- 0, ReplicationType.RATIS, ONE, new HashMap<>());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
-
- OMRequestHandlerPauseInjector injector =
- new OMRequestHandlerPauseInjector();
- omSM.getHandler().setInjector(injector);
- thread1.start();
- thread2.start();
- // Wait long enough for createKey's preExecute to finish executing
- GenericTestUtils.waitFor(() -> {
- return
getCluster().getOzoneManager().getOmServerProtocol().getLastRequestToSubmit().getCmdType().equals(
- Type.CreateKey);
- }, 100, 10000);
- injector.resume();
-
- try {
- thread1.join();
- thread2.join();
- } catch (InterruptedException ex) {
- throw new RuntimeException(ex);
- }
-
- omSM.getHandler().setInjector(null);
- // Generate more write requests to OM
- String newBucketName = UUID.randomUUID().toString();
- volume.createBucket(newBucketName);
- OzoneBucket bucket = volume.getBucket(newBucketName);
- for (int i = 0; i < 10; i++) {
- bucket.createKey("key-" + i, value.getBytes(UTF_8).length,
- ReplicationType.RATIS, ONE, new HashMap<>());
- }
-
- assertThat(omSMLog.getOutput()).contains("Failed to write, Exception
occurred");
- }
-
- private static class OMRequestHandlerPauseInjector extends FaultInjector {
- private CountDownLatch ready;
- private CountDownLatch wait;
-
- OMRequestHandlerPauseInjector() {
- init();
- }
-
- @Override
- public void init() {
- this.ready = new CountDownLatch(1);
- this.wait = new CountDownLatch(1);
- }
-
- @Override
- public void pause() throws IOException {
- ready.countDown();
- try {
- wait.await();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void resume() {
- // Make sure injector pauses before resuming.
- try {
- ready.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("resume interrupted");
- }
- wait.countDown();
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
- }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index 0f69245a6d..57de4901ab 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -19,14 +19,10 @@
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -35,10 +31,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -52,8 +45,6 @@ import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVolumeRequest;
@@ -81,36 +72,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * This class is to test all the public facing APIs of Ozone Client.
+ * Test Ozone Client with block tokens enabled.
*/
-public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
+class TestSecureOzoneRpcClient extends OzoneRpcClientTests {
- private static MiniOzoneCluster cluster = null;
- private static OzoneClient ozClient = null;
- private static ObjectStore store = null;
- private static OzoneManager ozoneManager;
- private static StorageContainerLocationProtocolClientSideTranslatorPB
- storageContainerLocationClient;
-
- private static File testDir;
- private static OzoneConfiguration conf;
-
- /**
- * Create a MiniOzoneCluster for testing.
- * <p>
- * Ozone is made active by setting OZONE_ENABLED = true
- *
- * @throws IOException
- */
@BeforeAll
public static void init() throws Exception {
- testDir = GenericTestUtils.getTestDir(
+ File testDir = GenericTestUtils.getTestDir(
TestSecureOzoneRpcClient.class.getSimpleName());
OzoneManager.setTestSecureOmFlag(true);
- conf = new OzoneConfiguration();
+ OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true);
@@ -125,24 +98,11 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
// constructed.
conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
OMConfigKeys.OZONE_BUCKET_LAYOUT_OBJECT_STORE);
- cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(14)
+ MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
.setCertificateClient(certificateClientTest)
- .setSecretKeyClient(new SecretKeyTestClient())
- .build();
- cluster.getOzoneManager().startSecretManager();
- cluster.waitForClusterToBeReady();
- ozClient = OzoneClientFactory.getRpcClient(conf);
- store = ozClient.getObjectStore();
- storageContainerLocationClient =
- cluster.getStorageContainerLocationClient();
- ozoneManager = cluster.getOzoneManager();
- TestOzoneRpcClient.setCluster(cluster);
- TestOzoneRpcClient.setOzClient(ozClient);
- TestOzoneRpcClient.setOzoneManager(ozoneManager);
- TestOzoneRpcClient.setStorageContainerLocationClient(
- storageContainerLocationClient);
- TestOzoneRpcClient.setStore(store);
+ .setSecretKeyClient(new SecretKeyTestClient());
+ startCluster(conf, builder);
+ getCluster().getOzoneManager().startSecretManager();
}
/**
@@ -163,22 +123,23 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
Instant testStartTime = getTestStartTime();
+ OzoneManager ozoneManager = getCluster().getOzoneManager();
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
String value = "sample value";
- store.createVolume(volumeName);
- OzoneVolume volume = store.getVolume(volumeName);
+ getStore().createVolume(volumeName);
+ OzoneVolume volume = getStore().getVolume(volumeName);
volume.createBucket(bucketName,
new BucketArgs.Builder().setBucketLayout(bucketLayout).build());
OzoneBucket bucket = volume.getBucket(bucketName);
+ RatisReplicationConfig replication =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
long committedBytes = ozoneManager.getMetrics().getDataCommittedBytes();
try (OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>())) {
+ value.getBytes(UTF_8).length, replication, new HashMap<>())) {
out.write(value.getBytes(UTF_8));
}
@@ -216,10 +177,7 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
}
}
-
- assertTrue(verifyRatisReplication(volumeName, bucketName,
- keyName, ReplicationType.RATIS,
- ReplicationFactor.ONE));
+ verifyReplication(volumeName, bucketName, keyName, replication);
assertEquals(value, new String(fileContent, UTF_8));
assertFalse(key.getCreationTime().isBefore(testStartTime));
assertFalse(key.getModificationTime().isBefore(testStartTime));
@@ -254,7 +212,7 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
String accessKey = UserGroupInformation.getCurrentUser().getUserName();
- S3SecretManager s3SecretManager = cluster.getOzoneManager()
+ S3SecretManager s3SecretManager = getCluster().getOzoneManager()
.getS3SecretManager();
// Add secret to S3Secret table.
@@ -274,9 +232,9 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
.setSignature(signature).setStringToSign(strToSign))
.build();
- GenericTestUtils.waitFor(() -> cluster.getOzoneManager().isLeaderReady(),
+ GenericTestUtils.waitFor(() ->
getCluster().getOzoneManager().isLeaderReady(),
100, 120000);
- OMResponse omResponse = cluster.getOzoneManager().getOmServerProtocol()
+ OMResponse omResponse =
getCluster().getOzoneManager().getOmServerProtocol()
.submitRequest(null, writeRequest);
// Verify response.
@@ -294,7 +252,7 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
.setSignature(signature).setStringToSign(strToSign))
.build();
- omResponse = cluster.getOzoneManager().getOmServerProtocol()
+ omResponse = getCluster().getOzoneManager().getOmServerProtocol()
.submitRequest(null, readRequest);
// Verify response.
@@ -311,63 +269,24 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
.storeSecret(accessKey, S3SecretValue.of(accessKey, "dummy"));
// Write request with invalid credentials.
- omResponse = cluster.getOzoneManager().getOmServerProtocol()
+ omResponse = getCluster().getOzoneManager().getOmServerProtocol()
.submitRequest(null, writeRequest);
assertEquals(Status.INVALID_TOKEN, omResponse.getStatus());
// Read request with invalid credentials.
- omResponse = cluster.getOzoneManager().getOmServerProtocol()
+ omResponse = getCluster().getOzoneManager().getOmServerProtocol()
.submitRequest(null, readRequest);
assertEquals(Status.INVALID_TOKEN, omResponse.getStatus());
}
- private boolean verifyRatisReplication(String volumeName, String bucketName,
- String keyName, ReplicationType type, ReplicationFactor factor)
- throws IOException {
- OmKeyArgs keyArgs = new OmKeyArgs.Builder()
- .setVolumeName(volumeName)
- .setBucketName(bucketName)
- .setKeyName(keyName)
- .build();
- HddsProtos.ReplicationType replicationType =
- HddsProtos.ReplicationType.valueOf(type.toString());
- HddsProtos.ReplicationFactor replicationFactor =
- HddsProtos.ReplicationFactor.valueOf(factor.getValue());
- OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
- for (OmKeyLocationInfo info:
- keyInfo.getLatestVersionLocations().getLocationList()) {
- ContainerInfo container =
- storageContainerLocationClient.getContainer(info.getContainerID());
- if (!ReplicationConfig.getLegacyFactor(container.getReplicationConfig())
- .equals(replicationFactor) || (
- container.getReplicationType() != replicationType)) {
- return false;
- }
- }
- return true;
- }
-
@Test
@Override
// Restart DN doesn't work with security enabled.
public void testZReadKeyWithUnhealthyContainerReplica() {
}
- /**
- * Close OzoneClient and shutdown MiniOzoneCluster.
- */
@AfterAll
public static void shutdown() throws IOException {
- if (ozClient != null) {
- ozClient.close();
- }
-
- if (storageContainerLocationClient != null) {
- storageContainerLocationClient.close();
- }
-
- if (cluster != null) {
- cluster.shutdown();
- }
+ shutdownCluster();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]