This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 3cc2a7e21be12d0a88897a4a95fc5f80f37cd100 Author: hao guo <[email protected]> AuthorDate: Tue Feb 15 23:35:46 2022 +0800 HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3023) --- .../keyvalue/impl/KeyValueStreamDataChannel.java | 56 ++-------------------- ...DataChannel.java => StreamDataChannelBase.java} | 39 ++++++++------- 2 files changed, 27 insertions(+), 68 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index c0570f5d4d..14ead4ea86 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -22,69 +22,21 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.ratis.statemachine.StateMachine; import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; /** * This class is used to get the DataChannel for streaming. */ -class KeyValueStreamDataChannel implements StateMachine.DataChannel { - private final RandomAccessFile randomAccessFile; - private final File file; - - private final ContainerData containerData; - private final ContainerMetrics metrics; - +class KeyValueStreamDataChannel extends StreamDataChannelBase { KeyValueStreamDataChannel(File file, ContainerData containerData, ContainerMetrics metrics) throws StorageContainerException { - try { - this.file = file; - this.randomAccessFile = new RandomAccessFile(file, "rw"); - } catch (FileNotFoundException e) { - throw new StorageContainerException("BlockFile not exists with " + - "container Id " + containerData.getContainerID() + - " file " + file.getAbsolutePath(), - ContainerProtos.Result.IO_EXCEPTION); - } - this.containerData = containerData; - this.metrics = metrics; - } - - @Override - public void force(boolean metadata) throws IOException { - randomAccessFile.getChannel().force(metadata); - } - - @Override - public int write(ByteBuffer src) throws IOException { - int writeBytes = randomAccessFile.getChannel().write(src); - metrics - .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes); - containerData.updateWriteStats(writeBytes, false); - return writeBytes; - } - - @Override - public boolean isOpen() { - return randomAccessFile.getChannel().isOpen(); - } - - @Override - public void close() throws IOException { - randomAccessFile.close(); + super(file, containerData, metrics); } @Override - public String toString() { - return "KeyValueStreamDataChannel{" + - "File=" + file.getAbsolutePath() + - ", containerID=" + containerData.getContainerID() + - '}'; + ContainerProtos.Type getType() { + return ContainerProtos.Type.StreamWrite; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java similarity index 77% copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index c0570f5d4d..b31e2ccbf4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -29,19 +29,21 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; /** - * This class is used to get the DataChannel for streaming. + * For write state machine data. */ -class KeyValueStreamDataChannel implements StateMachine.DataChannel { +abstract class StreamDataChannelBase implements StateMachine.DataChannel { private final RandomAccessFile randomAccessFile; + private final File file; private final ContainerData containerData; private final ContainerMetrics metrics; - KeyValueStreamDataChannel(File file, ContainerData containerData, - ContainerMetrics metrics) + StreamDataChannelBase(File file, ContainerData containerData, + ContainerMetrics metrics) throws StorageContainerException { try { this.file = file; @@ -56,23 +58,20 @@ class KeyValueStreamDataChannel implements StateMachine.DataChannel { this.metrics = metrics; } - @Override - public void force(boolean metadata) throws IOException { - randomAccessFile.getChannel().force(metadata); + abstract ContainerProtos.Type getType(); + + private FileChannel getChannel() { + return randomAccessFile.getChannel(); } @Override - public int write(ByteBuffer src) throws IOException { - int writeBytes = randomAccessFile.getChannel().write(src); - metrics - .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes); - containerData.updateWriteStats(writeBytes, false); - return writeBytes; + public final void force(boolean metadata) throws IOException { + getChannel().force(metadata); } @Override - public boolean isOpen() { - return randomAccessFile.getChannel().isOpen(); + public final boolean isOpen() { + return getChannel().isOpen(); } @Override @@ -80,9 +79,17 @@ class KeyValueStreamDataChannel implements StateMachine.DataChannel { randomAccessFile.close(); } + @Override + public int write(ByteBuffer src) throws IOException { + final int writeBytes = getChannel().write(src); + metrics.incContainerBytesStats(getType(), writeBytes); + containerData.updateWriteStats(writeBytes, false); + return writeBytes; + } + @Override public String toString() { - return "KeyValueStreamDataChannel{" + + return getClass().getSimpleName() + "{" + "File=" + file.getAbsolutePath() + ", containerID=" + containerData.getContainerID() + '}'; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
