szetszwo commented on code in PR #4891:
URL: https://github.com/apache/ozone/pull/4891#discussion_r1229704234
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -589,23 +589,46 @@ public CompletableFuture<?> link(DataStream stream,
LogEntryProto entry) {
if (stream == null) {
return JavaUtils.completeExceptionally(new IllegalStateException(
"DataStream is null"));
+ } else if (!(stream instanceof LocalStream)) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "Unexpected DataStream " + stream.getClass()));
}
final DataChannel dataChannel = stream.getDataChannel();
if (dataChannel.isOpen()) {
return JavaUtils.completeExceptionally(new IllegalStateException(
"DataStream: " + stream + " is not closed properly"));
}
- final ContainerCommandRequestProto request;
if (dataChannel instanceof KeyValueStreamDataChannel) {
Review Comment:
Let's change it to
```java
if (!(dataChannel instanceof KeyValueStreamDataChannel)) {
return JavaUtils.completeExceptionally(new IllegalStateException(
"Unexpected DataChannel " + dataChannel.getClass()));
}
```
Then, the code don't need to have so many indentation levels.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java:
##########
@@ -23,23 +23,32 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
/**
* For write state machine data.
*/
-abstract class StreamDataChannelBase implements StateMachine.DataChannel {
+public abstract class StreamDataChannelBase
Review Comment:
Don't add `public`.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java:
##########
@@ -198,7 +205,18 @@ void assertOpen() throws IOException {
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
- putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+ try {
+ putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+ } finally {
+ super.close();
+ }
+ }
+ }
+
+ @Override
+ protected void cleanupInternal() throws IOException {
+ buffers.cleanUpAll();
+ if (!closed.get()) {
Review Comment:
We should use `if (closed.compareAndSet(false, true)) {`
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java:
##########
@@ -18,19 +18,18 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase;
import org.apache.ratis.statemachine.StateMachine;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
class LocalStream implements StateMachine.DataStream {
- private final StateMachine.DataChannel dataChannel;
+ private final StreamDataChannelBase dataChannel;
Review Comment:
Use `KeyValueStreamDataChannel` instead. Then, we can keep
`StreamDataChannelBase` package private.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java:
##########
@@ -131,6 +131,13 @@ ReferenceCountedObject<ByteBuf> pollAll() {
refs.forEach(ReferenceCountedObject::release);
});
}
+
+ void cleanUpAll() {
+ final int size = deque.size();
+ for (int i = 0; i < size; i++) {
+ Optional.ofNullable(poll()).ifPresent(ReferenceCountedObject::release);
+ }
+ }
Review Comment:
Let's call it `releaseAll()` and check `isEmpty()`. Also, `poll()` won't
return null.
```java
void releaseAll() {
while (!deque.isEmpty()) {
poll().release();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]