This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7481790a84 HDDS-11283. Refactor KeyValueStreamDataChannel to avoid 
spurious IDE build issues (#7040)
7481790a84 is described below

commit 7481790a841ce9119fe3002ff6b01682265433ab
Author: Ritesh H Shukla <[email protected]>
AuthorDate: Wed Aug 7 03:20:48 2024 -0700

    HDDS-11283. Refactor KeyValueStreamDataChannel to avoid spurious IDE build 
issues (#7040)
---
 .../ozone/container/keyvalue/impl/Buffers.java     | 115 +++++++++++++++++++++
 .../keyvalue/impl/KeyValueStreamDataChannel.java   |  94 +----------------
 .../impl/TestKeyValueStreamDataChannel.java        |   1 -
 3 files changed, 116 insertions(+), 94 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java
new file mode 100644
index 0000000000..5411d1ce23
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.util.ReferenceCountedObject;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Keep the last {@link 
org.apache.hadoop.ozone.container.keyvalue.impl.Buffers#max} bytes in the buffer
+ * in order to create putBlockRequest
+ * at {@link 
#closeBuffers(org.apache.hadoop.ozone.container.keyvalue.impl.Buffers, 
WriteMethod)}}.
+ */
+class Buffers {
+  private final Deque<ReferenceCountedObject<ByteBuffer>> deque = new 
LinkedList<>();
+  private final int max;
+  private int length;
+
+  Buffers(int max) {
+    this.max = max;
+  }
+
+  private boolean isExtra(int n) {
+    return length - n >= max;
+  }
+
+  private boolean hasExtraBuffer() {
+    return Optional
+            .ofNullable(deque.peek())
+            .map(ReferenceCountedObject::get)
+            .filter(b -> isExtra(b.remaining())).isPresent();
+  }
+
+  /**
+   * @return extra buffers which are safe to be written.
+   */
+  Iterable<ReferenceCountedObject<ByteBuffer>> 
offer(ReferenceCountedObject<ByteBuffer> ref) {
+    final ByteBuffer buffer = ref.retain();
+    KeyValueStreamDataChannel.LOG.debug("offer {}", buffer);
+    final boolean offered = deque.offer(ref);
+    Preconditions.checkState(offered, "Failed to offer");
+    length += buffer.remaining();
+
+    return () -> new Iterator<ReferenceCountedObject<ByteBuffer>>() {
+      @Override
+      public boolean hasNext() {
+        return hasExtraBuffer();
+      }
+
+      @Override
+      public ReferenceCountedObject<ByteBuffer> next() {
+        final ReferenceCountedObject<ByteBuffer> polled = poll();
+        length -= polled.get().remaining();
+        Preconditions.checkState(length >= max);
+        return polled;
+      }
+    };
+  }
+
+  ReferenceCountedObject<ByteBuffer> poll() {
+    final ReferenceCountedObject<ByteBuffer> polled = 
Objects.requireNonNull(deque.poll());
+    RatisHelper.debug(polled.get(), "polled", KeyValueStreamDataChannel.LOG);
+    return polled;
+  }
+
+  ReferenceCountedObject<ByteBuf> pollAll() {
+    Preconditions.checkState(!deque.isEmpty(), "The deque is empty");
+    final ByteBuffer[] array = new ByteBuffer[deque.size()];
+    final List<ReferenceCountedObject<ByteBuffer>> refs = new 
ArrayList<>(deque.size());
+    for (int i = 0; i < array.length; i++) {
+      final ReferenceCountedObject<ByteBuffer> ref = poll();
+      refs.add(ref);
+      array[i] = ref.get();
+    }
+    final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly();
+    return ReferenceCountedObject.wrap(buf, () -> { }, () -> {
+      buf.release();
+      refs.forEach(ReferenceCountedObject::release);
+    });
+  }
+
+  void cleanUpAll() {
+    while (!deque.isEmpty()) {
+      poll().release();
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 7a08c7ef4e..7500860229 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -29,7 +29,6 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.util.ReferenceCountedObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,13 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -54,91 +47,6 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
   public static final Logger LOG =
       LoggerFactory.getLogger(KeyValueStreamDataChannel.class);
 
-  /**
-   * Keep the last {@link Buffers#max} bytes in the buffer
-   * in order to create putBlockRequest
-   * at {@link #closeBuffers(Buffers, WriteMethod)}}.
-   */
-  static class Buffers {
-    private final Deque<ReferenceCountedObject<ByteBuffer>> deque
-        = new LinkedList<>();
-    private final int max;
-    private int length;
-
-    Buffers(int max) {
-      this.max = max;
-    }
-
-    private boolean isExtra(int n) {
-      return length - n >= max;
-    }
-
-    private boolean hasExtraBuffer() {
-      return Optional.ofNullable(deque.peek())
-          .map(ReferenceCountedObject::get)
-          .filter(b -> isExtra(b.remaining()))
-          .isPresent();
-    }
-
-    /**
-     * @return extra buffers which are safe to be written.
-     */
-    Iterable<ReferenceCountedObject<ByteBuffer>> offer(
-        ReferenceCountedObject<ByteBuffer> ref) {
-      final ByteBuffer buffer = ref.retain();
-      LOG.debug("offer {}", buffer);
-      final boolean offered = deque.offer(ref);
-      Preconditions.checkState(offered, "Failed to offer");
-      length += buffer.remaining();
-
-      return () -> new Iterator<ReferenceCountedObject<ByteBuffer>>() {
-        @Override
-        public boolean hasNext() {
-          return hasExtraBuffer();
-        }
-
-        @Override
-        public ReferenceCountedObject<ByteBuffer> next() {
-          final ReferenceCountedObject<ByteBuffer> polled = poll();
-          length -= polled.get().remaining();
-          Preconditions.checkState(length >= max);
-          return polled;
-        }
-      };
-    }
-
-    ReferenceCountedObject<ByteBuffer> poll() {
-      final ReferenceCountedObject<ByteBuffer> polled
-          = Objects.requireNonNull(deque.poll());
-      RatisHelper.debug(polled.get(), "polled", LOG);
-      return polled;
-    }
-
-    ReferenceCountedObject<ByteBuf> pollAll() {
-      Preconditions.checkState(!deque.isEmpty(), "The deque is empty");
-      final ByteBuffer[] array = new ByteBuffer[deque.size()];
-      final List<ReferenceCountedObject<ByteBuffer>> refs
-          = new ArrayList<>(deque.size());
-      for (int i = 0; i < array.length; i++) {
-        final ReferenceCountedObject<ByteBuffer> ref = poll();
-        refs.add(ref);
-        array[i] = ref.get();
-      }
-      final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly();
-      return ReferenceCountedObject.wrap(buf, () -> {
-      }, () -> {
-        buf.release();
-        refs.forEach(ReferenceCountedObject::release);
-      });
-    }
-
-    void cleanUpAll() {
-      while (!deque.isEmpty()) {
-        poll().release();
-      }
-    }
-  }
-
   interface WriteMethod {
     int applyAsInt(ByteBuffer src) throws IOException;
   }
@@ -184,7 +92,7 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
 
   private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
       throws IOException {
-    for (; b.remaining() > 0;) {
+    while (b.remaining() > 0) {
       final int written = writeMethod.applyAsInt(b);
       if (written <= 0) {
         throw new IOException("Unable to write");
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
index 63045f7613..e6067e5c56 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
@@ -25,7 +25,6 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRe
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
 import org.apache.hadoop.ozone.ClientVersion;
-import 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers;
 import 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.FilePositionCount;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to