This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7481790a84 HDDS-11283. Refactor KeyValueStreamDataChannel to avoid
spurious IDE build issues (#7040)
7481790a84 is described below
commit 7481790a841ce9119fe3002ff6b01682265433ab
Author: Ritesh H Shukla <[email protected]>
AuthorDate: Wed Aug 7 03:20:48 2024 -0700
HDDS-11283. Refactor KeyValueStreamDataChannel to avoid spurious IDE build
issues (#7040)
---
.../ozone/container/keyvalue/impl/Buffers.java | 115 +++++++++++++++++++++
.../keyvalue/impl/KeyValueStreamDataChannel.java | 94 +----------------
.../impl/TestKeyValueStreamDataChannel.java | 1 -
3 files changed, 116 insertions(+), 94 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java
new file mode 100644
index 0000000000..5411d1ce23
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.util.ReferenceCountedObject;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Keep the last {@link
org.apache.hadoop.ozone.container.keyvalue.impl.Buffers#max} bytes in the buffer
+ * in order to create putBlockRequest
+ * at {@link
#closeBuffers(org.apache.hadoop.ozone.container.keyvalue.impl.Buffers,
WriteMethod)}}.
+ */
+class Buffers {
+ private final Deque<ReferenceCountedObject<ByteBuffer>> deque = new
LinkedList<>();
+ private final int max;
+ private int length;
+
+ Buffers(int max) {
+ this.max = max;
+ }
+
+ private boolean isExtra(int n) {
+ return length - n >= max;
+ }
+
+ private boolean hasExtraBuffer() {
+ return Optional
+ .ofNullable(deque.peek())
+ .map(ReferenceCountedObject::get)
+ .filter(b -> isExtra(b.remaining())).isPresent();
+ }
+
+ /**
+ * @return extra buffers which are safe to be written.
+ */
+ Iterable<ReferenceCountedObject<ByteBuffer>>
offer(ReferenceCountedObject<ByteBuffer> ref) {
+ final ByteBuffer buffer = ref.retain();
+ KeyValueStreamDataChannel.LOG.debug("offer {}", buffer);
+ final boolean offered = deque.offer(ref);
+ Preconditions.checkState(offered, "Failed to offer");
+ length += buffer.remaining();
+
+ return () -> new Iterator<ReferenceCountedObject<ByteBuffer>>() {
+ @Override
+ public boolean hasNext() {
+ return hasExtraBuffer();
+ }
+
+ @Override
+ public ReferenceCountedObject<ByteBuffer> next() {
+ final ReferenceCountedObject<ByteBuffer> polled = poll();
+ length -= polled.get().remaining();
+ Preconditions.checkState(length >= max);
+ return polled;
+ }
+ };
+ }
+
+ ReferenceCountedObject<ByteBuffer> poll() {
+ final ReferenceCountedObject<ByteBuffer> polled =
Objects.requireNonNull(deque.poll());
+ RatisHelper.debug(polled.get(), "polled", KeyValueStreamDataChannel.LOG);
+ return polled;
+ }
+
+ ReferenceCountedObject<ByteBuf> pollAll() {
+ Preconditions.checkState(!deque.isEmpty(), "The deque is empty");
+ final ByteBuffer[] array = new ByteBuffer[deque.size()];
+ final List<ReferenceCountedObject<ByteBuffer>> refs = new
ArrayList<>(deque.size());
+ for (int i = 0; i < array.length; i++) {
+ final ReferenceCountedObject<ByteBuffer> ref = poll();
+ refs.add(ref);
+ array[i] = ref.get();
+ }
+ final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly();
+ return ReferenceCountedObject.wrap(buf, () -> { }, () -> {
+ buf.release();
+ refs.forEach(ReferenceCountedObject::release);
+ });
+ }
+
+ void cleanUpAll() {
+ while (!deque.isEmpty()) {
+ poll().release();
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 7a08c7ef4e..7500860229 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -29,7 +29,6 @@ import
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,13 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -54,91 +47,6 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
public static final Logger LOG =
LoggerFactory.getLogger(KeyValueStreamDataChannel.class);
- /**
- * Keep the last {@link Buffers#max} bytes in the buffer
- * in order to create putBlockRequest
- * at {@link #closeBuffers(Buffers, WriteMethod)}}.
- */
- static class Buffers {
- private final Deque<ReferenceCountedObject<ByteBuffer>> deque
- = new LinkedList<>();
- private final int max;
- private int length;
-
- Buffers(int max) {
- this.max = max;
- }
-
- private boolean isExtra(int n) {
- return length - n >= max;
- }
-
- private boolean hasExtraBuffer() {
- return Optional.ofNullable(deque.peek())
- .map(ReferenceCountedObject::get)
- .filter(b -> isExtra(b.remaining()))
- .isPresent();
- }
-
- /**
- * @return extra buffers which are safe to be written.
- */
- Iterable<ReferenceCountedObject<ByteBuffer>> offer(
- ReferenceCountedObject<ByteBuffer> ref) {
- final ByteBuffer buffer = ref.retain();
- LOG.debug("offer {}", buffer);
- final boolean offered = deque.offer(ref);
- Preconditions.checkState(offered, "Failed to offer");
- length += buffer.remaining();
-
- return () -> new Iterator<ReferenceCountedObject<ByteBuffer>>() {
- @Override
- public boolean hasNext() {
- return hasExtraBuffer();
- }
-
- @Override
- public ReferenceCountedObject<ByteBuffer> next() {
- final ReferenceCountedObject<ByteBuffer> polled = poll();
- length -= polled.get().remaining();
- Preconditions.checkState(length >= max);
- return polled;
- }
- };
- }
-
- ReferenceCountedObject<ByteBuffer> poll() {
- final ReferenceCountedObject<ByteBuffer> polled
- = Objects.requireNonNull(deque.poll());
- RatisHelper.debug(polled.get(), "polled", LOG);
- return polled;
- }
-
- ReferenceCountedObject<ByteBuf> pollAll() {
- Preconditions.checkState(!deque.isEmpty(), "The deque is empty");
- final ByteBuffer[] array = new ByteBuffer[deque.size()];
- final List<ReferenceCountedObject<ByteBuffer>> refs
- = new ArrayList<>(deque.size());
- for (int i = 0; i < array.length; i++) {
- final ReferenceCountedObject<ByteBuffer> ref = poll();
- refs.add(ref);
- array[i] = ref.get();
- }
- final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly();
- return ReferenceCountedObject.wrap(buf, () -> {
- }, () -> {
- buf.release();
- refs.forEach(ReferenceCountedObject::release);
- });
- }
-
- void cleanUpAll() {
- while (!deque.isEmpty()) {
- poll().release();
- }
- }
- }
-
interface WriteMethod {
int applyAsInt(ByteBuffer src) throws IOException;
}
@@ -184,7 +92,7 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
throws IOException {
- for (; b.remaining() > 0;) {
+ while (b.remaining() > 0) {
final int written = writeMethod.applyAsInt(b);
if (written <= 0) {
throw new IOException("Unable to write");
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
index 63045f7613..e6067e5c56 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
@@ -25,7 +25,6 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRe
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.ozone.ClientVersion;
-import
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers;
import
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.FilePositionCount;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]