This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fc8f79524f HDDS-7931. EC: ManagedChannelImpl not cleaned up properly
(#4269)
fc8f79524f is described below
commit fc8f79524f95dd17e35d92d5d026982511f983ef
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Feb 16 13:19:32 2023 +0100
HDDS-7931. EC: ManagedChannelImpl not cleaned up properly (#4269)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 24 +-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 4 +-
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 9 +-
.../hdds/scm/storage/DummyChunkInputStream.java | 2 +-
.../hdds/scm/storage/TestBlockInputStream.java | 2 +-
.../ECReconstructionCoordinator.java | 52 +--
.../ozone/client/io/ECBlockOutputStreamEntry.java | 23 +-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 45 +--
.../hadoop/ozone/client/io/KeyOutputStream.java | 2 +-
.../hadoop/fs/ozone/TestOzoneFileSystem.java | 8 +-
.../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 6 +-
.../hdds/scm/storage/TestContainerCommandsEC.java | 395 +++++++++++----------
.../ozone/client/rpc/TestBlockOutputStream.java | 2 +
.../rpc/TestBlockOutputStreamFlushDelay.java | 2 +
.../rpc/TestBlockOutputStreamWithFailures.java | 2 +
...estBlockOutputStreamWithFailuresFlushDelay.java | 2 +
.../client/rpc/TestOzoneRpcClientAbstract.java | 44 ++-
.../client/rpc/read/TestChunkInputStream.java | 109 +++---
.../ozone/client/rpc/read/TestInputStreamBase.java | 2 +
.../apache/hadoop/ozone/container/TestHelper.java | 20 +-
20 files changed, 409 insertions(+), 346 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index a0d848e251..f01afd76dd 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -226,7 +226,6 @@ public class BlockInputStream extends
BlockExtendedInputStream {
.build();
}
acquireClient();
- boolean success = false;
List<ChunkInfo> chunks;
try {
if (LOG.isDebugEnabled()) {
@@ -247,18 +246,17 @@ public class BlockInputStream extends
BlockExtendedInputStream {
.getBlock(xceiverClient, blkIDBuilder.build(), token);
chunks = response.getBlockData().getChunksList();
- success = true;
} finally {
- if (!success) {
- xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
- }
+ releaseClient();
}
return chunks;
}
protected void acquireClient() throws IOException {
- xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
+ if (xceiverClientFactory != null && xceiverClient == null) {
+ xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
+ }
}
/**
@@ -320,7 +318,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
if (isConnectivityIssue(ex)) {
handleReadError(ex);
} else {
- current.releaseClient(false);
+ current.releaseClient();
}
continue;
} else {
@@ -435,7 +433,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
@Override
public synchronized void close() {
- releaseClient(true);
+ releaseClient();
xceiverClientFactory = null;
final List<ChunkInputStream> inputStreams = this.chunkStreams;
@@ -446,9 +444,9 @@ public class BlockInputStream extends
BlockExtendedInputStream {
}
}
- private void releaseClient(boolean invalidateClient) {
+ private void releaseClient() {
if (xceiverClientFactory != null && xceiverClient != null) {
- xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+ xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
xceiverClient = null;
}
}
@@ -487,7 +485,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
@Override
public synchronized void unbuffer() {
storePosition();
- releaseClient(true);
+ releaseClient();
final List<ChunkInputStream> inputStreams = this.chunkStreams;
if (inputStreams != null) {
@@ -514,11 +512,11 @@ public class BlockInputStream extends
BlockExtendedInputStream {
}
private void handleReadError(IOException cause) throws IOException {
- releaseClient(false);
+ releaseClient();
final List<ChunkInputStream> inputStreams = this.chunkStreams;
if (inputStreams != null) {
for (ChunkInputStream is : inputStreams) {
- is.releaseClient(false);
+ is.releaseClient();
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index f72d537ba9..66c73cac33 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -554,7 +554,7 @@ public class BlockOutputStream extends OutputStream {
throw e;
} finally {
if (close) {
- cleanup(true);
+ cleanup(false);
}
}
}
@@ -599,7 +599,7 @@ public class BlockOutputStream extends OutputStream {
// Preconditions.checkArgument(buffer.position() == 0);
// bufferPool.checkBufferPoolEmpty();
} else {
- cleanup(true);
+ cleanup(false);
}
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 554d605b3f..e30df34b85 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -282,13 +282,12 @@ public class ChunkInputStream extends InputStream
@Override
public synchronized void close() {
releaseBuffers();
- releaseClient(true);
+ releaseClient();
}
- protected synchronized void releaseClient(boolean invalidateClient) {
+ protected synchronized void releaseClient() {
if (xceiverClientFactory != null && xceiverClient != null) {
- xceiverClientFactory.releaseClientForReadData(
- xceiverClient, invalidateClient);
+ xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
xceiverClient = null;
}
}
@@ -739,7 +738,7 @@ public class ChunkInputStream extends InputStream
public synchronized void unbuffer() {
storePosition();
releaseBuffers();
- releaseClient(true);
+ releaseClient();
}
@VisibleForTesting
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
index cfbba0df24..78d0c05bfe 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
@@ -82,7 +82,7 @@ public class DummyChunkInputStream extends ChunkInputStream {
}
@Override
- protected void releaseClient(boolean invalidateClient) {
+ protected void releaseClient() {
// no-op
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index ef270731d5..4943d6ce1a 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -423,7 +423,7 @@ public class TestBlockInputStream {
Assert.assertEquals(len, bytesRead);
verify(refreshPipeline).apply(blockID);
verify(clientFactory).acquireClientForReadData(pipeline);
- verify(clientFactory).releaseClient(client, true);
+ verify(clientFactory).releaseClientForReadData(client, false);
} finally {
subject.close();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 4820fbcecb..943e19a719 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -271,35 +271,35 @@ public class ECReconstructionCoordinator implements
Closeable {
(int) (configuration.getStreamBufferMaxSize() / configuration
.getStreamBufferSize()),
ByteStringConversion.createByteBufferConversion(false));
- for (int i = 0; i < toReconstructIndexes.size(); i++) {
- int replicaIndex = toReconstructIndexes.get(i);
- DatanodeDetails datanodeDetails =
- targetMap.get(replicaIndex);
- targetBlockStreams[i] = getECBlockOutputstream(blockLocationInfo,
- datanodeDetails, repConfig, replicaIndex, bufferPool,
- configuration);
- bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
- // Make sure it's clean. Don't want to reuse the erroneously returned
- // buffers from the pool.
- bufs[i].clear();
- }
-
- sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
- .collect(Collectors.toSet()));
- long length = safeBlockGroupLength;
- while (length > 0) {
- int readLen = sis.recoverChunks(bufs);
- // TODO: can be submitted in parallel
- for (int i = 0; i < bufs.length; i++) {
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- future = targetBlockStreams[i].write(bufs[i]);
- checkFailures(targetBlockStreams[i], future);
+ try {
+ for (int i = 0; i < toReconstructIndexes.size(); i++) {
+ int replicaIndex = toReconstructIndexes.get(i);
+ DatanodeDetails datanodeDetails =
+ targetMap.get(replicaIndex);
+ targetBlockStreams[i] = getECBlockOutputstream(blockLocationInfo,
+ datanodeDetails, repConfig, replicaIndex, bufferPool,
+ configuration);
+ bufs[i] = byteBufferPool.getBuffer(false,
repConfig.getEcChunkSize());
+ // Make sure it's clean. Don't want to reuse the erroneously returned
+ // buffers from the pool.
bufs[i].clear();
}
- length -= readLen;
- }
- try {
+ sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
+ .collect(Collectors.toSet()));
+ long length = safeBlockGroupLength;
+ while (length > 0) {
+ int readLen = sis.recoverChunks(bufs);
+ // TODO: can be submitted in parallel
+ for (int i = 0; i < bufs.length; i++) {
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ future = targetBlockStreams[i].write(bufs[i]);
+ checkFailures(targetBlockStreams[i], future);
+ bufs[i].clear();
+ }
+ length -= readLen;
+ }
+
for (ECBlockOutputStream targetStream : targetBlockStreams) {
targetStream.executePutBlock(true, true,
blockLocationInfo.getLength(), blockDataGroup);
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 5b9170774e..8a8b32faaa 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
@@ -103,6 +104,14 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
}
}
+ @Override
+ void cleanup(boolean invalidateClient) {
+ if (isInitialized()) {
+ IOUtils.close(LOG, blockOutputStreams);
+ blockOutputStreams = null;
+ }
+ }
+
@Override
public OutputStream getOutputStream() {
if (!isInitialized()) {
@@ -132,7 +141,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
}
public void markFailed(Exception e) {
- if (blockOutputStreams[currentStreamIdx] != null) {
+ if (isInitialized() && blockOutputStreams[currentStreamIdx] != null) {
blockOutputStreams[currentStreamIdx].setIoException(e);
}
}
@@ -379,13 +388,17 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
}
private Stream<ECBlockOutputStream> blockStreams() {
- return Arrays.stream(blockOutputStreams).filter(Objects::nonNull);
+ return isInitialized()
+ ? Arrays.stream(blockOutputStreams).filter(Objects::nonNull)
+ : Stream.empty();
}
private Stream<ECBlockOutputStream> dataStreams() {
- return Arrays.stream(blockOutputStreams)
- .limit(replicationConfig.getData())
- .filter(Objects::nonNull);
+ return isInitialized()
+ ? Arrays.stream(blockOutputStreams)
+ .limit(replicationConfig.getData())
+ .filter(Objects::nonNull)
+ : Stream.empty();
}
public ByteString calculateChecksum() throws IOException {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index a19a8f762c..11fe92e5ed 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -83,7 +83,8 @@ public final class ECKeyOutputStream extends KeyOutputStream {
public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class);
- private boolean closed;
+ private volatile boolean closed;
+ private volatile boolean closing;
// how much of data is actually written yet to underlying stream
private long offset;
// how much data has been ingested into the stream
@@ -408,6 +409,9 @@ public final class ECKeyOutputStream extends
KeyOutputStream {
private void writeToOutputStream(ECBlockOutputStreamEntry current,
byte[] b, int writeLen, int off, boolean isParity)
throws IOException {
+ if (closing) {
+ throw new IOException("Stream is closing, avoid re-opening streams");
+ }
try {
if (!isParity) {
// In case if exception while writing, this length will be updated back
@@ -438,8 +442,7 @@ public final class ECKeyOutputStream extends
KeyOutputStream {
}
private void markStreamClosed() {
- blockOutputStreamEntryPool.cleanup();
- closed = true;
+ closing = true;
}
private void markStreamAsFailed(Exception e) {
@@ -487,23 +490,23 @@ public final class ECKeyOutputStream extends
KeyOutputStream {
}
closed = true;
try {
- // If stripe buffer is not empty, encode and flush the stripe.
- if (ecChunkBufferCache.getFirstDataCell().position() > 0) {
- generateParityCells();
- addStripeToQueue(ecChunkBufferCache);
- }
- // Send EOF mark to flush thread.
- addStripeToQueue(new EOFDummyStripe());
+ if (!closing) {
+ // If stripe buffer is not empty, encode and flush the stripe.
+ if (ecChunkBufferCache.getFirstDataCell().position() > 0) {
+ generateParityCells();
+ addStripeToQueue(ecChunkBufferCache);
+ }
+ // Send EOF mark to flush thread.
+ addStripeToQueue(new EOFDummyStripe());
- // Wait for all the stripes to be written.
- flushFuture.get();
- flushExecutor.shutdownNow();
+ // Wait for all the stripes to be written.
+ flushFuture.get();
- closeCurrentStreamEntry();
- Preconditions.checkArgument(writeOffset == offset,
- "Expected writeOffset= " + writeOffset
- + " Expected offset=" + offset);
- blockOutputStreamEntryPool.commitKey(offset);
+ Preconditions.checkArgument(writeOffset == offset,
+ "Expected writeOffset= " + writeOffset
+ + " Expected offset=" + offset);
+ blockOutputStreamEntryPool.commitKey(offset);
+ }
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
@@ -516,6 +519,8 @@ public final class ECKeyOutputStream extends
KeyOutputStream {
} catch (InterruptedException e) {
throw new IOException("Flushing thread was interrupted", e);
} finally {
+ flushExecutor.shutdownNow();
+ closeCurrentStreamEntry();
blockOutputStreamEntryPool.cleanup();
}
}
@@ -548,7 +553,7 @@ public final class ECKeyOutputStream extends
KeyOutputStream {
private boolean flushStripeFromQueue() throws IOException {
try {
ECChunkBuffers stripe = ecStripeQueue.take();
- while (!(stripe instanceof EOFDummyStripe)) {
+ while (!closing && !(stripe instanceof EOFDummyStripe)) {
if (stripe instanceof CheckpointDummyStripe) {
flushCheckpoint.set(((CheckpointDummyStripe) stripe).version);
} else {
@@ -639,7 +644,7 @@ public final class ECKeyOutputStream extends
KeyOutputStream {
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
- if (closed) {
+ if (closing || closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ blockOutputStreamEntryPool.getKeyName());
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 0a64b9392d..00970788ea 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -335,7 +335,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
excludeList.addPipeline(pipelineId);
}
// just clean up the current stream.
- streamEntry.cleanup(!retryFailure);
+ streamEntry.cleanup(retryFailure);
// discard all subsequent blocks the containers and pipelines which
// are in the exclude list so that, the very next retry should never
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 10f326362c..fb7c04df59 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzonePrefixPathImpl;
@@ -150,6 +151,7 @@ public class TestOzoneFileSystem {
private static boolean omRatisEnabled;
private static MiniOzoneCluster cluster;
+ private static OzoneClient client;
private static OzoneManagerProtocol writeClient;
private static FileSystem fs;
private static OzoneFileSystem o3fs;
@@ -177,7 +179,8 @@ public class TestOzoneFileSystem {
.build();
cluster.waitForClusterToBeReady();
- writeClient = cluster.getRpcClient().getObjectStore()
+ client = cluster.getRpcClient();
+ writeClient = client.getObjectStore()
.getClientProxy().getOzoneManagerClient();
// create a volume and a bucket to be used by OzoneFileSystem
ozoneBucket = TestDataUtil.createVolumeAndBucket(cluster, bucketLayout);
@@ -199,6 +202,7 @@ public class TestOzoneFileSystem {
@AfterClass
public static void teardown() {
+ IOUtils.closeQuietly(client);
if (cluster != null) {
cluster.shutdown();
}
@@ -1443,7 +1447,7 @@ public class TestOzoneFileSystem {
if (isDirectory) {
key = key + "/";
}
- return cluster.getClient().getObjectStore().getVolume(volumeName)
+ return client.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).getKey(key);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 4aefd48fb2..482eb1d47c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
@@ -135,6 +136,7 @@ public class TestRootedOzoneFileSystem {
LoggerFactory.getLogger(TestRootedOzoneFileSystem.class);
private static final float TRASH_INTERVAL = 0.05f; // 3 seconds
+ private static OzoneClient client;
@Parameterized.Parameters
public static Collection<Object[]> data() {
@@ -166,6 +168,7 @@ public class TestRootedOzoneFileSystem {
@Parameterized.AfterParam
public static void teardownParam() {
+ IOUtils.closeQuietly(client);
// Tear down the cluster after EACH set of parameters
if (cluster != null) {
cluster.shutdown();
@@ -257,7 +260,8 @@ public class TestRootedOzoneFileSystem {
.setNumDatanodes(5)
.build();
cluster.waitForClusterToBeReady();
- objectStore = cluster.getClient().getObjectStore();
+ client = cluster.getClient();
+ objectStore = client.getObjectStore();
rootPath = String.format("%s://%s/",
OzoneConsts.OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 68dfd6fbb7..96c601cbd0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -106,6 +106,7 @@ import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static
org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
/**
* This class tests container commands on EC containers.
@@ -252,88 +253,89 @@ public class TestContainerCommandsEC {
@Test
public void testCreateRecoveryContainer() throws Exception {
- XceiverClientManager xceiverClientManager =
- new XceiverClientManager(config);
- ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2);
- Pipeline newPipeline =
- scm.getPipelineManager().createPipeline(replicationConfig);
- scm.getPipelineManager().activatePipeline(newPipeline.getId());
- final ContainerInfo container =
- scm.getContainerManager().allocateContainer(replicationConfig, "test");
- Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
- .generateToken(ANY_USER, container.containerID());
- scm.getContainerManager().getContainerStateManager()
- .addContainer(container.getProtobuf());
-
- XceiverClientSpi dnClient = xceiverClientManager.acquireClient(
- createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0),
- 2));
- try {
- // To create the actual situation, container would have been in closed
- // state at SCM.
+ try (XceiverClientManager xceiverClientManager =
+ new XceiverClientManager(config)) {
+ ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2);
+ Pipeline newPipeline =
+ scm.getPipelineManager().createPipeline(replicationConfig);
+ scm.getPipelineManager().activatePipeline(newPipeline.getId());
+ final ContainerInfo container = scm.getContainerManager()
+ .allocateContainer(replicationConfig, "test");
+ Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
+ .generateToken(ANY_USER, container.containerID());
scm.getContainerManager().getContainerStateManager()
- .updateContainerState(container.containerID().getProtobuf(),
- HddsProtos.LifeCycleEvent.FINALIZE);
- scm.getContainerManager().getContainerStateManager()
- .updateContainerState(container.containerID().getProtobuf(),
- HddsProtos.LifeCycleEvent.CLOSE);
-
- //Create the recovering container in DN.
- String encodedToken = cToken.encodeToUrlString();
- ContainerProtocolCalls.createRecoveringContainer(dnClient,
- container.containerID().getProtobuf().getId(),
- encodedToken, 4);
-
- BlockID blockID = ContainerTestHelper
- .getTestBlockID(container.containerID().getProtobuf().getId());
- Token<? extends TokenIdentifier> blockToken =
- blockTokenGenerator.generateToken(ANY_USER, blockID,
- EnumSet.of(READ, WRITE), Long.MAX_VALUE);
- byte[] data = "TestData".getBytes(UTF_8);
- ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
- ContainerTestHelper.newWriteChunkRequestBuilder(newPipeline, blockID,
- ChunkBuffer.wrap(ByteBuffer.wrap(data)), 0)
- .setEncodedToken(blockToken.encodeToUrlString())
- .build();
- dnClient.sendCommand(writeChunkRequest);
-
- // Now, explicitly make a putKey request for the block.
- ContainerProtos.ContainerCommandRequestProto putKeyRequest =
- ContainerTestHelper.getPutBlockRequest(newPipeline,
- writeChunkRequest.getWriteChunk());
- dnClient.sendCommand(putKeyRequest);
-
- ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
- ContainerProtocolCalls.readContainer(dnClient,
- container.containerID().getProtobuf().getId(), encodedToken);
- Assert.assertEquals(ContainerProtos.ContainerDataProto.State.RECOVERING,
- readContainerResponseProto.getContainerData().getState());
- // Container at SCM should be still in closed state.
- Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
- scm.getContainerManager().getContainerStateManager()
- .getContainer(container.containerID()).getState());
- // close container call
- ContainerProtocolCalls.closeContainer(dnClient,
- container.containerID().getProtobuf().getId(), encodedToken);
- // Make sure we have the container and readable.
- readContainerResponseProto = ContainerProtocolCalls
- .readContainer(dnClient,
- container.containerID().getProtobuf().getId(), encodedToken);
- Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
- readContainerResponseProto.getContainerData().getState());
- ContainerProtos.ReadChunkResponseProto readChunkResponseProto =
- ContainerProtocolCalls.readChunk(dnClient,
- writeChunkRequest.getWriteChunk().getChunkData(), blockID, null,
- blockToken);
- ByteBuffer[] readOnlyByteBuffersArray = BufferUtils
- .getReadOnlyByteBuffersArray(
- readChunkResponseProto.getDataBuffers().getBuffersList());
- Assert.assertEquals(readOnlyByteBuffersArray[0].limit(), data.length);
- byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()];
- readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length);
- Assert.assertArrayEquals(data, readBuff);
- } finally {
- xceiverClientManager.releaseClient(dnClient, false);
+ .addContainer(container.getProtobuf());
+
+ XceiverClientSpi dnClient = xceiverClientManager.acquireClient(
+ createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0),
+ 2));
+ try {
+ // To create the actual situation, container would have been in closed
+ // state at SCM.
+ scm.getContainerManager().getContainerStateManager()
+ .updateContainerState(container.containerID().getProtobuf(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ scm.getContainerManager().getContainerStateManager()
+ .updateContainerState(container.containerID().getProtobuf(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+
+ //Create the recovering container in DN.
+ String encodedToken = cToken.encodeToUrlString();
+ ContainerProtocolCalls.createRecoveringContainer(dnClient,
+ container.containerID().getProtobuf().getId(),
+ encodedToken, 4);
+
+ BlockID blockID = ContainerTestHelper
+ .getTestBlockID(container.containerID().getProtobuf().getId());
+ Token<? extends TokenIdentifier> blockToken =
+ blockTokenGenerator.generateToken(ANY_USER, blockID,
+ EnumSet.of(READ, WRITE), Long.MAX_VALUE);
+ byte[] data = "TestData".getBytes(UTF_8);
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ newWriteChunkRequestBuilder(newPipeline, blockID,
+ ChunkBuffer.wrap(ByteBuffer.wrap(data)), 0)
+ .setEncodedToken(blockToken.encodeToUrlString())
+ .build();
+ dnClient.sendCommand(writeChunkRequest);
+
+ // Now, explicitly make a putKey request for the block.
+ ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+ ContainerTestHelper.getPutBlockRequest(newPipeline,
+ writeChunkRequest.getWriteChunk());
+ dnClient.sendCommand(putKeyRequest);
+
+ ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
+ ContainerProtocolCalls.readContainer(dnClient,
+ container.containerID().getProtobuf().getId(), encodedToken);
+
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.RECOVERING,
+ readContainerResponseProto.getContainerData().getState());
+ // Container at SCM should be still in closed state.
+ Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
+ scm.getContainerManager().getContainerStateManager()
+ .getContainer(container.containerID()).getState());
+ // close container call
+ ContainerProtocolCalls.closeContainer(dnClient,
+ container.containerID().getProtobuf().getId(), encodedToken);
+ // Make sure we have the container and readable.
+ readContainerResponseProto = ContainerProtocolCalls
+ .readContainer(dnClient,
+ container.containerID().getProtobuf().getId(), encodedToken);
+ Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+ readContainerResponseProto.getContainerData().getState());
+ ContainerProtos.ReadChunkResponseProto readChunkResponseProto =
+ ContainerProtocolCalls.readChunk(dnClient,
+ writeChunkRequest.getWriteChunk().getChunkData(), blockID,
null,
+ blockToken);
+ ByteBuffer[] readOnlyByteBuffersArray = BufferUtils
+ .getReadOnlyByteBuffersArray(
+ readChunkResponseProto.getDataBuffers().getBuffersList());
+ Assert.assertEquals(readOnlyByteBuffersArray[0].limit(), data.length);
+ byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()];
+ readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length);
+ Assert.assertArrayEquals(data, readBuff);
+ } finally {
+ xceiverClientManager.releaseClient(dnClient, false);
+ }
}
}
@@ -394,116 +396,136 @@ public class TestContainerCommandsEC {
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
- XceiverClientManager xceiverClientManager =
- new XceiverClientManager(config);
createKeyAndWriteData(keyString, bucket, numInputChunks);
- ECReconstructionCoordinator coordinator =
- new ECReconstructionCoordinator(config, certClient,
- null, ECReconstructionMetrics.create());
-
- ECReconstructionMetrics metrics = coordinator.getECReconstructionMetrics();
- OzoneKeyDetails key = bucket.getKey(keyString);
- long conID = key.getOzoneKeyLocations().get(0).getContainerID();
- Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
- .generateToken(ANY_USER, new ContainerID(conID));
-
- //Close the container first.
- closeContainer(conID);
-
- Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
- scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
- .getPipelineID());
-
- SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
- List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
- List<Pipeline> containerToDeletePipeline = new ArrayList<>();
- for (DatanodeDetails srcDn : nodeSet) {
- int replIndex = containerPipeline.getReplicaIndex(srcDn);
- if (missingIndexes.contains(replIndex)) {
- containerToDeletePipeline
- .add(createSingleNodePipeline(containerPipeline, srcDn,
replIndex));
- continue;
+ try (
+ XceiverClientManager xceiverClientManager =
+ new XceiverClientManager(config);
+ ECReconstructionCoordinator coordinator =
+ new ECReconstructionCoordinator(config, certClient,
+ null, ECReconstructionMetrics.create())) {
+
+ ECReconstructionMetrics metrics =
+ coordinator.getECReconstructionMetrics();
+ OzoneKeyDetails key = bucket.getKey(keyString);
+ long conID = key.getOzoneKeyLocations().get(0).getContainerID();
+ Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
+ .generateToken(ANY_USER, new ContainerID(conID));
+
+ //Close the container first.
+ closeContainer(conID);
+
+ Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
+ scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
+ .getPipelineID());
+
+ SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+
+ List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
+ List<Pipeline> containerToDeletePipeline = new ArrayList<>();
+ for (DatanodeDetails srcDn : nodeSet) {
+ int replIndex = containerPipeline.getReplicaIndex(srcDn);
+ if (missingIndexes.contains(replIndex)) {
+ containerToDeletePipeline.add(
+ createSingleNodePipeline(containerPipeline, srcDn, replIndex));
+ continue;
+ }
+ sourceNodeMap.put(replIndex, srcDn);
}
- sourceNodeMap.put(replIndex, srcDn);
- }
- //Find nodes outside of pipeline
- List<DatanodeDetails> clusterDnsList =
- cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
- .collect(Collectors.toList());
- List<DatanodeDetails> targetNodes = new ArrayList<>();
- for (DatanodeDetails clusterDN : clusterDnsList) {
- if (!nodeSet.contains(clusterDN)) {
- targetNodes.add(clusterDN);
- if (targetNodes.size() == missingIndexes.size()) {
- break;
+ //Find nodes outside of pipeline
+ List<DatanodeDetails> clusterDnsList =
+ cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
+ .collect(Collectors.toList());
+ List<DatanodeDetails> targetNodes = new ArrayList<>();
+ for (DatanodeDetails clusterDN : clusterDnsList) {
+ if (!nodeSet.contains(clusterDN)) {
+ targetNodes.add(clusterDN);
+ if (targetNodes.size() == missingIndexes.size()) {
+ break;
+ }
}
}
- }
- Assert.assertEquals(missingIndexes.size(), targetNodes.size());
-
- List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
- blockDataArrList = new ArrayList<>();
- for (int j = 0; j < containerToDeletePipeline.size(); j++) {
- org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData =
- new ECContainerOperationClient(new OzoneConfiguration(), certClient)
- .listBlock(conID,
containerToDeletePipeline.get(j).getFirstNode(),
- (ECReplicationConfig) containerToDeletePipeline.get(j)
- .getReplicationConfig(), cToken);
- blockDataArrList.add(blockData);
- // Delete the first index container
- ContainerProtocolCalls.deleteContainer(
- xceiverClientManager.acquireClient(containerToDeletePipeline.get(j)),
- conID, true, cToken.encodeToUrlString());
- }
+ Assert.assertEquals(missingIndexes.size(), targetNodes.size());
+
+ List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
+ blockDataArrList = new ArrayList<>();
+ try (ECContainerOperationClient ecContainerOperationClient =
+ new ECContainerOperationClient(config, certClient)) {
+ for (int j = 0; j < containerToDeletePipeline.size(); j++) {
+ Pipeline p = containerToDeletePipeline.get(j);
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+ blockData = ecContainerOperationClient.listBlock(
+ conID, p.getFirstNode(),
+ (ECReplicationConfig) p.getReplicationConfig(),
+ cToken);
+ blockDataArrList.add(blockData);
+ // Delete the first index container
+ XceiverClientSpi client = xceiverClientManager.acquireClient(
+ p);
+ try {
+ ContainerProtocolCalls.deleteContainer(
+ client,
+ conID, true, cToken.encodeToUrlString());
+ } finally {
+ xceiverClientManager.releaseClient(client, false);
+ }
+ }
- //Give the new target to reconstruct the container
- SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
- for (int k = 0; k < missingIndexes.size(); k++) {
- targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
- }
+ //Give the new target to reconstruct the container
+ SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+ for (int k = 0; k < missingIndexes.size(); k++) {
+ targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
+ }
- coordinator.reconstructECContainerGroup(conID,
- (ECReplicationConfig) containerPipeline.getReplicationConfig(),
- sourceNodeMap, targetNodeMap);
-
- // Assert the original container metadata with the new recovered container.
- Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
- targetNodeMap.entrySet().iterator();
- int i = 0;
- while (iterator.hasNext()) {
- Map.Entry<Integer, DatanodeDetails> next = iterator.next();
- DatanodeDetails targetDN = next.getValue();
- Map<DatanodeDetails, Integer> indexes = new HashMap<>();
- indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
- targetNodeMap.entrySet().iterator().next().getKey());
- Pipeline newTargetPipeline =
- Pipeline.newBuilder().setId(PipelineID.randomId())
+ coordinator.reconstructECContainerGroup(conID,
+ (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+ sourceNodeMap, targetNodeMap);
+
+ // Assert the original container metadata with the new recovered one
+ Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+ targetNodeMap.entrySet().iterator();
+ int i = 0;
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+ DatanodeDetails targetDN = next.getValue();
+ Map<DatanodeDetails, Integer> indexes = new HashMap<>();
+ indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
+ targetNodeMap.entrySet().iterator().next().getKey());
+ Pipeline newTargetPipeline = Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
.setReplicationConfig(containerPipeline.getReplicationConfig())
.setReplicaIndexes(indexes)
.setState(Pipeline.PipelineState.CLOSED)
.setNodes(ImmutableList.of(targetDN)).build();
- org.apache.hadoop.ozone.container.common.helpers.BlockData[]
- reconstructedBlockData =
- new ECContainerOperationClient(new OzoneConfiguration(), certClient)
- .listBlock(conID, newTargetPipeline.getFirstNode(),
- (ECReplicationConfig) newTargetPipeline
- .getReplicationConfig(), cToken);
- Assert.assertEquals(blockDataArrList.get(i).length,
- reconstructedBlockData.length);
- checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
- ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
- ContainerProtocolCalls.readContainer(
- xceiverClientManager.acquireClient(newTargetPipeline), conID,
- cToken.encodeToUrlString());
- Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
- readContainerResponseProto.getContainerData().getState());
- i++;
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+ reconstructedBlockData =
+ ecContainerOperationClient
+ .listBlock(conID, newTargetPipeline.getFirstNode(),
+ (ECReplicationConfig) newTargetPipeline
+ .getReplicationConfig(), cToken);
+ Assert.assertEquals(blockDataArrList.get(i).length,
+ reconstructedBlockData.length);
+ checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+ XceiverClientSpi client = xceiverClientManager.acquireClient(
+ newTargetPipeline);
+ try {
+ ContainerProtos.ReadContainerResponseProto readContainerResponse =
+ ContainerProtocolCalls.readContainer(
+ client, conID,
+ cToken.encodeToUrlString());
+
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+ readContainerResponse.getContainerData().getState());
+ } finally {
+ xceiverClientManager.releaseClient(client, false);
+ }
+ i++;
+ }
+ Assertions.assertEquals(metrics.getReconstructionTotal(), 1L);
+ }
}
- Assertions.assertEquals(metrics.getReconstructionTotal(), 1L);
}
private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
@@ -574,21 +596,22 @@ public class TestContainerCommandsEC {
targetNodeMap.put(3, invalidTargetNode);
Assert.assertThrows(IOException.class, () -> {
- ECReconstructionCoordinator coordinator =
+ try (ECReconstructionCoordinator coordinator =
new ECReconstructionCoordinator(config, certClient,
- null, ECReconstructionMetrics.create());
- coordinator.reconstructECContainerGroup(conID,
- (ECReplicationConfig) containerPipeline.getReplicationConfig(),
- sourceNodeMap, targetNodeMap);
+ null, ECReconstructionMetrics.create())) {
+ coordinator.reconstructECContainerGroup(conID,
+ (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+ sourceNodeMap, targetNodeMap);
+ }
});
final DatanodeDetails targetDNToCheckContainerCLeaned = goodTargetNode;
StorageContainerException ex =
Assert.assertThrows(StorageContainerException.class, () -> {
- ECContainerOperationClient client =
- new ECContainerOperationClient(new OzoneConfiguration(),
- certClient);
- client.listBlock(conID, targetDNToCheckContainerCLeaned,
- new ECReplicationConfig(3, 2), cToken);
+ try (ECContainerOperationClient client =
+ new ECContainerOperationClient(config, certClient)) {
+ client.listBlock(conID, targetDNToCheckContainerCLeaned,
+ new ECReplicationConfig(3, 2), cToken);
+ }
});
Assert.assertEquals("ContainerID 1 does not exist", ex.getMessage());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index f99ea72ff7..35cab2689f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -123,6 +124,7 @@ public class TestBlockOutputStream {
*/
@AfterAll
public static void shutdown() {
+ IOUtils.close(null, client);
if (cluster != null) {
cluster.shutdown();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index 8ca49631e9..2a7b4bcf43 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -115,6 +116,7 @@ public class TestBlockOutputStreamFlushDelay {
*/
@AfterAll
public static void shutdown() {
+ IOUtils.close(null, client);
if (cluster != null) {
cluster.shutdown();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index af128dc527..688fe57e86 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -150,6 +151,7 @@ public class TestBlockOutputStreamWithFailures {
*/
@AfterEach
public void shutdown() {
+ IOUtils.close(null, client);
if (cluster != null) {
cluster.shutdown();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index 2aa35e01e4..b7c2f4433f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -150,6 +151,7 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
*/
@AfterEach
public void shutdown() {
+ IOUtils.close(null, client);
if (cluster != null) {
cluster.shutdown();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index e9f3a9130f..69f3c92311 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -1207,25 +1207,29 @@ public abstract class TestOzoneRpcClientAbstract {
Assert.assertEquals(0L, getBucketUsedNamespace(volumeName, bucketName));
RpcClient client = new RpcClient(cluster.getConf(), null);
- String directoryName1 = UUID.randomUUID().toString();
- String directoryName2 = UUID.randomUUID().toString();
-
- client.createDirectory(volumeName, bucketName, directoryName1);
- Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
- // Test create a directory twice will not increase usedNamespace twice
- client.createDirectory(volumeName, bucketName, directoryName2);
- Assert.assertEquals(2L, getBucketUsedNamespace(volumeName, bucketName));
- client.deleteKey(volumeName, bucketName,
- OzoneFSUtils.addTrailingSlashIfNeeded(directoryName1), false);
- Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
- client.deleteKey(volumeName, bucketName,
- OzoneFSUtils.addTrailingSlashIfNeeded(directoryName2), false);
- Assert.assertEquals(0L, getBucketUsedNamespace(volumeName, bucketName));
-
- String multiComponentsDir = "dir1/dir2/dir3/dir4";
- client.createDirectory(volumeName, bucketName, multiComponentsDir);
- Assert.assertEquals(OzoneFSUtils.getFileCount(multiComponentsDir),
- getBucketUsedNamespace(volumeName, bucketName));
+ try {
+ String directoryName1 = UUID.randomUUID().toString();
+ String directoryName2 = UUID.randomUUID().toString();
+
+ client.createDirectory(volumeName, bucketName, directoryName1);
+ Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
+ // Test create a directory twice will not increase usedNamespace twice
+ client.createDirectory(volumeName, bucketName, directoryName2);
+ Assert.assertEquals(2L, getBucketUsedNamespace(volumeName, bucketName));
+ client.deleteKey(volumeName, bucketName,
+ OzoneFSUtils.addTrailingSlashIfNeeded(directoryName1), false);
+ Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
+ client.deleteKey(volumeName, bucketName,
+ OzoneFSUtils.addTrailingSlashIfNeeded(directoryName2), false);
+ Assert.assertEquals(0L, getBucketUsedNamespace(volumeName, bucketName));
+
+ String multiComponentsDir = "dir1/dir2/dir3/dir4";
+ client.createDirectory(volumeName, bucketName, multiComponentsDir);
+ Assert.assertEquals(OzoneFSUtils.getFileCount(multiComponentsDir),
+ getBucketUsedNamespace(volumeName, bucketName));
+ } finally {
+ client.close();
+ }
}
@ParameterizedTest
@@ -1674,6 +1678,8 @@ public abstract class TestOzoneRpcClientAbstract {
RpcClient client = new RpcClient(configuration, null);
try (InputStream is = client.getKey(volumeName, bucketName, keyName)) {
is.read(new byte[100]);
+ } finally {
+ client.close();
}
if (verifyChecksum) {
fail("Reading corrupted data should fail, as verify checksum is " +
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
index 13c8a5911b..cc675ecce0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
@@ -57,60 +57,61 @@ public class TestChunkInputStream extends
TestInputStreamBase {
int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
byte[] inputData = writeRandomBytes(keyName, dataLength);
- KeyInputStream keyInputStream = getKeyInputStream(keyName);
-
- BlockInputStream block0Stream =
- (BlockInputStream)keyInputStream.getPartStreams().get(0);
- block0Stream.initialize();
-
- ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
-
- // To read 1 byte of chunk data, ChunkInputStream should get one full
- // checksum boundary worth of data from Container and store it in buffers.
- chunk0Stream.read(new byte[1]);
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
- BYTES_PER_CHECKSUM);
-
- // Read > checksum boundary of data from chunk0
- int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
- byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
- validateData(inputData, 0, readData);
-
- // The first checksum boundary size of data was already existing in the
- // ChunkStream buffers. Once that data is read, the next checksum
- // boundary size of data will be fetched again to read the remaining data.
- // Hence there should be 1 checksum boundary size of data stored in the
- // ChunkStreams buffers at the end of the read.
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
- BYTES_PER_CHECKSUM);
-
- // Seek to a position in the third checksum boundary (so that current
- // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
- // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
- // data being read into the buffers. There should be 2 buffers in the
- // stream but the the first buffer should be released after it is read
- // and the second buffer should have BYTES_PER_CHECKSUM capacity.
- readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
- int offset = 2 * BYTES_PER_CHECKSUM + 1;
- readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
- validateData(inputData, offset, readData);
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
- BYTES_PER_CHECKSUM);
-
-
- // Read the full chunk data - 1 and verify that all chunk data is read into
- // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
- // released once all chunk data is read.
- readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
- validateData(inputData, 0, readData);
- int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
- expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
-
- // Read the last byte of chunk and verify that the buffers are released.
- chunk0Stream.read(new byte[1]);
- Assert.assertNull("ChunkInputStream did not release buffers after " +
- "reaching EOF.", chunk0Stream.getCachedBuffers());
+ try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+ BlockInputStream block0Stream =
+ (BlockInputStream)keyInputStream.getPartStreams().get(0);
+ block0Stream.initialize();
+
+ ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
+
+ // To read 1 byte of chunk data, ChunkInputStream should get one full
+ // checksum boundary worth of data from Container and store it in
buffers.
+ chunk0Stream.read(new byte[1]);
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+ BYTES_PER_CHECKSUM);
+
+ // Read > checksum boundary of data from chunk0
+ int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+ byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
+ validateData(inputData, 0, readData);
+
+ // The first checksum boundary size of data was already existing in the
+ // ChunkStream buffers. Once that data is read, the next checksum
+ // boundary size of data will be fetched again to read the remaining
data.
+ // Hence there should be 1 checksum boundary size of data stored in the
+ // ChunkStreams buffers at the end of the read.
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+ BYTES_PER_CHECKSUM);
+
+ // Seek to a position in the third checksum boundary (so that current
+ // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
+ // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
+ // data being read into the buffers. There should be 2 buffers in the
+ // stream but the the first buffer should be released after it is read
+ // and the second buffer should have BYTES_PER_CHECKSUM capacity.
+ readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+ int offset = 2 * BYTES_PER_CHECKSUM + 1;
+ readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
+ validateData(inputData, offset, readData);
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
+ BYTES_PER_CHECKSUM);
+
+
+ // Read the full chunk data -1 and verify that all chunk data is read
into
+ // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
+ // released once all chunk data is read.
+ readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
+ validateData(inputData, 0, readData);
+ int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
+ expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
+
+ // Read the last byte of chunk and verify that the buffers are released.
+ chunk0Stream.read(new byte[1]);
+ Assert.assertNull("ChunkInputStream did not release buffers after " +
+ "reaching EOF.", chunk0Stream.getCachedBuffers());
+ }
}
private void testCloseReleasesBuffers() throws Exception {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 11dab74bba..325ea9ba54 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -140,6 +141,7 @@ public abstract class TestInputStreamBase {
*/
@After
public void shutdown() {
+ IOUtils.close(null, client);
if (cluster != null) {
cluster.shutdown();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 14cd1b66f4..2a7423b15d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -177,17 +177,17 @@ public final class TestHelper {
public static void validateData(String keyName, byte[] data,
ObjectStore objectStore, String volumeName, String bucketName)
throws Exception {
- byte[] readData = new byte[data.length];
- OzoneInputStream is =
+ try (OzoneInputStream is =
objectStore.getVolume(volumeName).getBucket(bucketName)
- .readKey(keyName);
- is.read(readData);
- MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
- sha1.update(data);
- MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
- sha2.update(readData);
- Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
- is.close();
+ .readKey(keyName)) {
+ byte[] readData = new byte[data.length];
+ is.read(readData);
+ MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ sha1.update(data);
+ MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ sha2.update(readData);
+ Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
+ }
}
public static void waitForContainerClose(OzoneOutputStream outputStream,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]