This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c7ddf9b8a6 HDDS-8848. Clean up datanode memory in case of an ozone
stream write error (#4891)
c7ddf9b8a6 is described below
commit c7ddf9b8a65a50482dd3644e42cc3c97a7dc0176
Author: hao guo <[email protected]>
AuthorDate: Thu Jun 15 15:46:05 2023 +0800
HDDS-8848. Clean up datanode memory in case of an ozone stream write error
(#4891)
---
.../server/ratis/ContainerStateMachine.java | 37 ++++++++++++++++++----
.../common/transport/server/ratis/LocalStream.java | 19 ++++++-----
.../keyvalue/impl/KeyValueStreamDataChannel.java | 19 ++++++++++-
.../keyvalue/impl/StreamDataChannelBase.java | 34 +++++++++++++++++++-
4 files changed, 90 insertions(+), 19 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0af9657b69..e8ab1b0af8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -589,6 +589,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
if (stream == null) {
return JavaUtils.completeExceptionally(new IllegalStateException(
"DataStream is null"));
+ } else if (!(stream instanceof LocalStream)) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "Unexpected DataStream " + stream.getClass()));
}
final DataChannel dataChannel = stream.getDataChannel();
if (dataChannel.isOpen()) {
@@ -596,16 +599,36 @@ public class ContainerStateMachine extends
BaseStateMachine {
"DataStream: " + stream + " is not closed properly"));
}
- final ContainerCommandRequestProto request;
- if (dataChannel instanceof KeyValueStreamDataChannel) {
- request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest();
- } else {
+ if (!(dataChannel instanceof KeyValueStreamDataChannel)) {
return JavaUtils.completeExceptionally(new IllegalStateException(
"Unexpected DataChannel " + dataChannel.getClass()));
}
- return runCommandAsync(request, entry).whenComplete(
- (res, e) -> LOG.debug("link {}, entry: {}, request: {}",
- res.getResult(), entry, request));
+
+ final KeyValueStreamDataChannel kvStreamDataChannel =
+ (KeyValueStreamDataChannel) dataChannel;
+
+ final ContainerCommandRequestProto request =
+ kvStreamDataChannel.getPutBlockRequest();
+
+ return runCommandAsync(request, entry).whenComplete((response, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to link logEntry {} for request {}",
+ TermIndex.valueOf(entry), request, e);
+ }
+ if (response != null) {
+ final ContainerProtos.Result result = response.getResult();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} to link logEntry {} for request {}, response: {}",
+ result, TermIndex.valueOf(entry), request, response);
+ }
+ if (result == ContainerProtos.Result.SUCCESS) {
+ kvStreamDataChannel.setLinked();
+ return;
+ }
+ }
+ // failed to link, cleanup
+ kvStreamDataChannel.cleanUp();
+ });
}
private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
index 8daa7185b6..2473fdeb0e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
class LocalStream implements StateMachine.DataStream {
@@ -41,14 +41,13 @@ class LocalStream implements StateMachine.DataStream {
@Override
public CompletableFuture<?> cleanUp() {
- return CompletableFuture.supplyAsync(() -> {
- try {
- dataChannel.close();
- return true;
- } catch (IOException e) {
- throw new CompletionException("Failed to close data channel", e);
- }
- });
+ if (!(dataChannel instanceof KeyValueStreamDataChannel)) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "Unexpected DataChannel " + dataChannel.getClass()));
+ }
+ return CompletableFuture
+ .supplyAsync(((KeyValueStreamDataChannel) dataChannel)::cleanUp,
+ executor);
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 99dc40f5d0..e34a1e273c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -131,6 +131,12 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
refs.forEach(ReferenceCountedObject::release);
});
}
+
+ void cleanUpAll() {
+ while (!deque.isEmpty()) {
+ poll().release();
+ }
+ }
}
interface WriteMethod {
@@ -198,7 +204,18 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
- putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+ try {
+ putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+ } finally {
+ super.close();
+ }
+ }
+ }
+
+ @Override
+ protected void cleanupInternal() throws IOException {
+ buffers.cleanUpAll();
+ if (closed.compareAndSet(false, true)) {
super.close();
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
index 4b1a255e93..58fc2c348b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
@@ -23,6 +23,8 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
@@ -30,16 +32,23 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
/**
* For write state machine data.
*/
-abstract class StreamDataChannelBase implements StateMachine.DataChannel {
+abstract class StreamDataChannelBase
+ implements StateMachine.DataChannel {
+ static final Logger LOG = LoggerFactory.getLogger(
+ StreamDataChannelBase.class);
+
private final RandomAccessFile randomAccessFile;
private final File file;
+ private final AtomicBoolean linked = new AtomicBoolean();
+ private final AtomicBoolean cleaned = new AtomicBoolean();
private final ContainerData containerData;
private final ContainerMetrics metrics;
@@ -85,6 +94,29 @@ abstract class StreamDataChannelBase implements
StateMachine.DataChannel {
return getChannel().isOpen();
}
+ public void setLinked() {
+ linked.set(true);
+ }
+
+ /** @return true iff {@link StateMachine.DataChannel} is already linked. */
+ public boolean cleanUp() {
+ if (linked.get()) {
+ // already linked, nothing to do.
+ return true;
+ }
+ if (cleaned.compareAndSet(false, true)) {
+ // close and then delete the file.
+ try {
+ cleanupInternal();
+ } catch (IOException e) {
+ LOG.warn("Failed to close " + this, e);
+ }
+ }
+ return false;
+ }
+
+ protected abstract void cleanupInternal() throws IOException;
+
@Override
public void close() throws IOException {
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]