szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r833177203
##########
File path:
hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
##########
@@ -566,6 +567,13 @@ public static String getFixedLengthString(String string,
int length) {
return String.format("%1$" + length + "s", string);
}
+ public static byte[] getFixedLengthBytes(int length) {
+ byte[] bytes = new byte[length];
+ Random random = new Random();
+ random.nextBytes(bytes);
+ return bytes;
+ }
+
Review comment:
Use ThreadLocalRandom and support non-random data as below:
```
public static byte[] generateData(int length, boolean random) {
final byte[] data = new byte[length];
if (random) {
ThreadLocalRandom.current().nextBytes(data);
} else {
for (int i = 0; i < length; i++) {
data[i] = (byte) i;
}
}
return data;
}
```
##########
File path:
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamRoutingTable.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is used to get the RoutingTable for streaming.
+ */
+public final class StreamRoutingTable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StreamRoutingTable.class);
+
+ private StreamRoutingTable() {
+ }
+
+ public static RoutingTable getRoutingTable(Pipeline pipeline) {
Review comment:
Move it to RatisHelper.
##########
File path:
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
##########
@@ -549,18 +551,44 @@ private ContainerCommandResponseProto runCommand(
@Override
public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
- return CompletableFuture.supplyAsync(() -> {
- if (stream == null) {
- return JavaUtils.completeExceptionally(
- new IllegalStateException("DataStream is null"));
- }
- if (stream.getDataChannel().isOpen()) {
- return JavaUtils.completeExceptionally(
- new IllegalStateException(
- "DataStream: " + stream + " is not closed properly"));
- } else {
- return CompletableFuture.completedFuture(null);
+ if (stream == null) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "DataStream is null"));
+ }
+ final DataChannel dataChannel = stream.getDataChannel();
+ if (dataChannel.isOpen()) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "DataStream: " + stream + " is not closed properly"));
+ }
+
+ final CompletableFuture<ContainerCommandResponseProto> f;
+ if (dataChannel instanceof SmallFileStreamDataChannel) {
+ f = link(entry, (SmallFileStreamDataChannel) dataChannel);
+ } else if (dataChannel instanceof KeyValueStreamDataChannel) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "Unexpected DataChannel " + dataChannel.getClass()));
+ }
+ return f.whenComplete((res, e) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PutBlock {} Term: {} Index: {}",
+ res.getResult(), entry.getTerm(), entry.getIndex());
}
+ });
+ }
+
+ private CompletableFuture<ContainerCommandResponseProto> link(
+ LogEntryProto entry, SmallFileStreamDataChannel smallFileChannel) {
+ return CompletableFuture.supplyAsync(() -> {
+ final DispatcherContext context = new DispatcherContext.Builder()
+ .setTerm(entry.getTerm())
+ .setLogIndex(entry.getIndex())
+ .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
+ .setContainer2BCSIDMap(container2BCSIDMap)
+ .build();
+
+ return runCommand(smallFileChannel.getPutBlockRequest(), context);
}, executor);
Review comment:
Pass ContainerCommandRequestProto instead and rename it to
runCommandAsync(..)
```
private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
ContainerCommandRequestProto requestProto, LogEntryProto entry) {
return CompletableFuture.supplyAsync(() -> {
final DispatcherContext context = new DispatcherContext.Builder()
.setTerm(entry.getTerm())
.setLogIndex(entry.getIndex())
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
return runCommand(requestProto, context);
}, executor);
}
```
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/SmallFileDataStreamOutput.java
##########
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
+import org.apache.hadoop.hdds.scm.storage.StreamRoutingTable;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * SmallFileDataStreamOutput, only used to write requests smaller than
ChunkSize
+ * <p>
+ * TODO : currently not support multi-thread access.
+ */
+public class SmallFileDataStreamOutput implements ByteBufferStreamOutput {
Review comment:
It seems that the code in this class is copied from
BlockDataStreamOutputEntryPool and KeyDataStreamOutput. We should reuse the
code but not copy them. Otherwise, it is very hard to maintain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]