This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c7ddf9b8a6 HDDS-8848. Clean up datanode memory in case of an ozone 
stream write error (#4891)
c7ddf9b8a6 is described below

commit c7ddf9b8a65a50482dd3644e42cc3c97a7dc0176
Author: hao guo <[email protected]>
AuthorDate: Thu Jun 15 15:46:05 2023 +0800

    HDDS-8848. Clean up datanode memory in case of an ozone stream write error 
(#4891)
---
 .../server/ratis/ContainerStateMachine.java        | 37 ++++++++++++++++++----
 .../common/transport/server/ratis/LocalStream.java | 19 ++++++-----
 .../keyvalue/impl/KeyValueStreamDataChannel.java   | 19 ++++++++++-
 .../keyvalue/impl/StreamDataChannelBase.java       | 34 +++++++++++++++++++-
 4 files changed, 90 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0af9657b69..e8ab1b0af8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -589,6 +589,9 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     if (stream == null) {
       return JavaUtils.completeExceptionally(new IllegalStateException(
           "DataStream is null"));
+    } else if (!(stream instanceof LocalStream)) {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "Unexpected DataStream " + stream.getClass()));
     }
     final DataChannel dataChannel = stream.getDataChannel();
     if (dataChannel.isOpen()) {
@@ -596,16 +599,36 @@ public class ContainerStateMachine extends 
BaseStateMachine {
           "DataStream: " + stream + " is not closed properly"));
     }
 
-    final ContainerCommandRequestProto request;
-    if (dataChannel instanceof KeyValueStreamDataChannel) {
-      request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest();
-    } else {
+    if (!(dataChannel instanceof KeyValueStreamDataChannel)) {
       return JavaUtils.completeExceptionally(new IllegalStateException(
           "Unexpected DataChannel " + dataChannel.getClass()));
     }
-    return runCommandAsync(request, entry).whenComplete(
-        (res, e) -> LOG.debug("link {}, entry: {}, request: {}",
-            res.getResult(), entry, request));
+
+    final KeyValueStreamDataChannel kvStreamDataChannel =
+        (KeyValueStreamDataChannel) dataChannel;
+
+    final ContainerCommandRequestProto request =
+        kvStreamDataChannel.getPutBlockRequest();
+
+    return runCommandAsync(request, entry).whenComplete((response, e) -> {
+      if (e != null) {
+        LOG.warn("Failed to link logEntry {} for request {}",
+            TermIndex.valueOf(entry), request, e);
+      }
+      if (response != null) {
+        final ContainerProtos.Result result = response.getResult();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} to link logEntry {} for request {}, response: {}",
+              result, TermIndex.valueOf(entry), request, response);
+        }
+        if (result == ContainerProtos.Result.SUCCESS) {
+          kvStreamDataChannel.setLinked();
+          return;
+        }
+      }
+      // failed to link, cleanup
+      kvStreamDataChannel.cleanUp();
+    });
   }
 
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
index 8daa7185b6..2473fdeb0e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -18,11 +18,11 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
+import 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
 
-import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 class LocalStream implements StateMachine.DataStream {
@@ -41,14 +41,13 @@ class LocalStream implements StateMachine.DataStream {
 
   @Override
   public CompletableFuture<?> cleanUp() {
-    return CompletableFuture.supplyAsync(() -> {
-      try {
-        dataChannel.close();
-        return true;
-      } catch (IOException e) {
-        throw new CompletionException("Failed to close data channel", e);
-      }
-    });
+    if (!(dataChannel instanceof KeyValueStreamDataChannel)) {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "Unexpected DataChannel " + dataChannel.getClass()));
+    }
+    return CompletableFuture
+        .supplyAsync(((KeyValueStreamDataChannel) dataChannel)::cleanUp,
+            executor);
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 99dc40f5d0..e34a1e273c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -131,6 +131,12 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
         refs.forEach(ReferenceCountedObject::release);
       });
     }
+
+    void cleanUpAll() {
+      while (!deque.isEmpty()) {
+        poll().release();
+      }
+    }
   }
 
   interface WriteMethod {
@@ -198,7 +204,18 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
   @Override
   public void close() throws IOException {
     if (closed.compareAndSet(false, true)) {
-      putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+      try {
+        putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+      } finally {
+        super.close();
+      }
+    }
+  }
+
+  @Override
+  protected void cleanupInternal() throws IOException {
+    buffers.cleanUpAll();
+    if (closed.compareAndSet(false, true)) {
       super.close();
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
index 4b1a255e93..58fc2c348b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
@@ -23,6 +23,8 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -30,16 +32,23 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
 /**
  * For write state machine data.
  */
-abstract class StreamDataChannelBase implements StateMachine.DataChannel {
+abstract class StreamDataChannelBase
+    implements StateMachine.DataChannel {
+  static final Logger LOG = LoggerFactory.getLogger(
+      StreamDataChannelBase.class);
+
   private final RandomAccessFile randomAccessFile;
 
   private final File file;
+  private final AtomicBoolean linked = new AtomicBoolean();
+  private final AtomicBoolean cleaned = new AtomicBoolean();
 
   private final ContainerData containerData;
   private final ContainerMetrics metrics;
@@ -85,6 +94,29 @@ abstract class StreamDataChannelBase implements 
StateMachine.DataChannel {
     return getChannel().isOpen();
   }
 
+  public void setLinked() {
+    linked.set(true);
+  }
+
+  /** @return true iff {@link StateMachine.DataChannel} is already linked. */
+  public boolean cleanUp() {
+    if (linked.get()) {
+      // already linked, nothing to do.
+      return true;
+    }
+    if (cleaned.compareAndSet(false, true)) {
+      // close and then delete the file.
+      try {
+        cleanupInternal();
+      } catch (IOException e) {
+        LOG.warn("Failed to close " + this, e);
+      }
+    }
+    return false;
+  }
+
+  protected abstract void cleanupInternal() throws IOException;
+
   @Override
   public void close() throws IOException {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to