fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r727476347
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
/**
* Helper for {@link ECBlockOutputStream}.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
- private final boolean isParityStreamEntry;
- private ECBlockOutputStream out;
+ private ECBlockOutputStream[] blockOutputStreams;
+ private final ECReplicationConfig replicationConfig;
+ private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+ private long length;
+
+ private int currentStreamIdx = 0;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, boolean isParityStream) {
+ OzoneClientConfig config) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
- this.isParityStreamEntry = isParityStream;
+ assertInstanceOf(
+ pipeline.getReplicationConfig(), ECReplicationConfig.class);
+ this.replicationConfig =
+ (ECReplicationConfig) pipeline.getReplicationConfig();
+ this.length = replicationConfig.getData() * length;
+ }
+
+ @Override
+ void checkStream() throws IOException {
+ if (!isInitialized()) {
+ blockOutputStreams =
+ new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+ }
+ if (blockOutputStreams[currentStreamIdx] == null) {
+ createOutputStream();
+ }
}
@Override
void createOutputStream() throws IOException {
- this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
- getPipeline(), getBufferPool(), getConf(), getToken());
+ Pipeline ecPipeline = getPipeline();
+ List<DatanodeDetails> nodes = getPipeline().getNodes();
+ blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+ getBlockID(),
+ getXceiverClientManager(),
+ createSingleECBlockPipeline(
+ ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+ getBufferPool(),
+ getConf(),
+ getToken());
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ if (!isInitialized()) {
+ return null;
+ }
+ checkState(blockOutputStreams[currentStreamIdx] != null);
+ return blockOutputStreams[currentStreamIdx];
+ }
+
+ @Override
+ boolean isInitialized() {
+ return blockOutputStreams != null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
}
- public ECBlockOutputStream getOutputStream() {
- return out;
+ public int getCurrentStreamIdx() {
+ return currentStreamIdx;
+ }
+
+ public void useNextBlockStream() {
+ currentStreamIdx++;
+ }
+
+ public void forceToFirstParityBlock(){
+ currentStreamIdx = replicationConfig.getData();
+ }
+
+ public void resetToFirstEntry(){
+ currentStreamIdx = 0;
+ }
+
+ @Override
+ void incCurrentPosition() {
+ if (isWritingParity()) {
+ return;
+ }
+ super.incCurrentPosition();
+ }
+
+ @Override
+ void incCurrentPosition(long len) {
+ if (isWritingParity()){
+ return;
+ }
+ super.incCurrentPosition(len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
Review comment:
Not sure what do you mean?
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
/**
* Helper for {@link ECBlockOutputStream}.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
- private final boolean isParityStreamEntry;
- private ECBlockOutputStream out;
+ private ECBlockOutputStream[] blockOutputStreams;
+ private final ECReplicationConfig replicationConfig;
+ private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+ private long length;
+
+ private int currentStreamIdx = 0;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, boolean isParityStream) {
+ OzoneClientConfig config) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
- this.isParityStreamEntry = isParityStream;
+ assertInstanceOf(
+ pipeline.getReplicationConfig(), ECReplicationConfig.class);
+ this.replicationConfig =
+ (ECReplicationConfig) pipeline.getReplicationConfig();
+ this.length = replicationConfig.getData() * length;
+ }
+
+ @Override
+ void checkStream() throws IOException {
+ if (!isInitialized()) {
+ blockOutputStreams =
+ new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+ }
+ if (blockOutputStreams[currentStreamIdx] == null) {
+ createOutputStream();
+ }
}
@Override
void createOutputStream() throws IOException {
- this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
- getPipeline(), getBufferPool(), getConf(), getToken());
+ Pipeline ecPipeline = getPipeline();
+ List<DatanodeDetails> nodes = getPipeline().getNodes();
+ blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+ getBlockID(),
+ getXceiverClientManager(),
+ createSingleECBlockPipeline(
+ ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+ getBufferPool(),
+ getConf(),
+ getToken());
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ if (!isInitialized()) {
+ return null;
+ }
+ checkState(blockOutputStreams[currentStreamIdx] != null);
+ return blockOutputStreams[currentStreamIdx];
+ }
+
+ @Override
+ boolean isInitialized() {
+ return blockOutputStreams != null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
}
- public ECBlockOutputStream getOutputStream() {
- return out;
+ public int getCurrentStreamIdx() {
+ return currentStreamIdx;
+ }
+
+ public void useNextBlockStream() {
+ currentStreamIdx++;
+ }
+
+ public void forceToFirstParityBlock(){
+ currentStreamIdx = replicationConfig.getData();
+ }
+
+ public void resetToFirstEntry(){
+ currentStreamIdx = 0;
+ }
+
+ @Override
+ void incCurrentPosition() {
+ if (isWritingParity()) {
+ return;
+ }
+ super.incCurrentPosition();
+ }
+
+ @Override
+ void incCurrentPosition(long len) {
+ if (isWritingParity()){
+ return;
+ }
+ super.incCurrentPosition(len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
Review comment:
ah... spaces... though this way it requires wrapping... I was hesitant
where to wrap though to help readability... :)
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
/**
* Helper for {@link ECBlockOutputStream}.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
- private final boolean isParityStreamEntry;
- private ECBlockOutputStream out;
+ private ECBlockOutputStream[] blockOutputStreams;
+ private final ECReplicationConfig replicationConfig;
+ private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+ private long length;
+
+ private int currentStreamIdx = 0;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, boolean isParityStream) {
+ OzoneClientConfig config) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
- this.isParityStreamEntry = isParityStream;
+ assertInstanceOf(
+ pipeline.getReplicationConfig(), ECReplicationConfig.class);
+ this.replicationConfig =
+ (ECReplicationConfig) pipeline.getReplicationConfig();
+ this.length = replicationConfig.getData() * length;
+ }
+
+ @Override
+ void checkStream() throws IOException {
+ if (!isInitialized()) {
+ blockOutputStreams =
+ new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+ }
+ if (blockOutputStreams[currentStreamIdx] == null) {
+ createOutputStream();
+ }
}
@Override
void createOutputStream() throws IOException {
- this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
- getPipeline(), getBufferPool(), getConf(), getToken());
+ Pipeline ecPipeline = getPipeline();
+ List<DatanodeDetails> nodes = getPipeline().getNodes();
+ blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+ getBlockID(),
+ getXceiverClientManager(),
+ createSingleECBlockPipeline(
+ ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+ getBufferPool(),
+ getConf(),
+ getToken());
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ if (!isInitialized()) {
+ return null;
+ }
+ checkState(blockOutputStreams[currentStreamIdx] != null);
+ return blockOutputStreams[currentStreamIdx];
+ }
+
+ @Override
+ boolean isInitialized() {
+ return blockOutputStreams != null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
}
- public ECBlockOutputStream getOutputStream() {
- return out;
+ public int getCurrentStreamIdx() {
+ return currentStreamIdx;
+ }
+
+ public void useNextBlockStream() {
+ currentStreamIdx++;
+ }
+
+ public void forceToFirstParityBlock(){
+ currentStreamIdx = replicationConfig.getData();
+ }
+
+ public void resetToFirstEntry(){
+ currentStreamIdx = 0;
+ }
+
+ @Override
+ void incCurrentPosition() {
+ if (isWritingParity()) {
+ return;
+ }
+ super.incCurrentPosition();
+ }
+
+ @Override
+ void incCurrentPosition(long len) {
+ if (isWritingParity()){
+ return;
+ }
+ super.incCurrentPosition(len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
+ for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+ if (blockOutputStreams[i] != null) {
+ blockOutputStreams[i].flush();
+ }
+ }
+ }
+
+ @Override
+ boolean isClosed() {
+ if (!isInitialized()) {
+ return false;
+ }
+ return blockStreams().allMatch(BlockOutputStream::isClosed);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
+ for (ECBlockOutputStream stream : blockOutputStreams) {
+ if (stream != null) {
+ stream.close();
+ }
+ }
+ updateBlockID(underlyingBlockID());
+ }
+
+ @Override
+ long getTotalAckDataLength() {
+ if (!isInitialized()) {
+ return 0;
+ }
+ // blockID is the same for EC blocks inside one block group managed by
+ // this entry.
+ updateBlockID(underlyingBlockID());
+ //TODO: A future implementation might require something like this, but
+ // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+ // this method returns 0 all the time from the unrelying streams.
+ // After we have a confirmed ack mechanism, like there is in
+ // RatisBlockOutputStream, we should revisit this part, and decide if we
+ // want to filter out parity here for example.
Review comment:
I replaced the comment, and removed code from here... Also updated
javadoc of the class, and moved the comment about blockID into
underlyingBlockID()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]