This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4b4fec HDDS-1224. Restructure code to validate the response from
server in the Read path (#806)
e4b4fec is described below
commit e4b4fec66fa4d16ce661cd990d488ad2a6f6b5cf
Author: bshashikant <[email protected]>
AuthorDate: Tue Jun 4 23:07:02 2019 +0530
HDDS-1224. Restructure code to validate the response from server in the
Read path (#806)
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 88 +++++++++++----------
.../hadoop/hdds/scm/storage/BlockInputStream.java | 89 +++++++++-------------
.../hdds/scm/storage/TestBlockInputStream.java | 3 +-
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 17 +++--
.../hadoop/hdds/scm/storage/CheckedBiFunction.java | 31 ++++++++
.../hdds/scm/storage/ContainerProtocolCalls.java | 87 ++++++++++-----------
.../client/rpc/TestOzoneRpcClientAbstract.java | 6 +-
.../hadoop/ozone/client/rpc/TestReadRetries.java | 5 +-
8 files changed, 168 insertions(+), 158 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index a535c9f..13d3eed 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -31,6 +30,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServi
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
@@ -62,7 +62,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
/**
* A Client for the storageContainer protocol.
@@ -83,7 +82,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
- * @param config -- Ozone Config
+ * @param config -- Ozone Config
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
super();
@@ -91,7 +90,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
- this.secConfig = new SecurityConfig(config);
+ this.secConfig = new SecurityConfig(config);
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
@@ -101,7 +100,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
/**
* To be used when grpc token is not enabled.
- * */
+ */
@Override
public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
@@ -112,7 +111,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
/**
* Passed encoded token to GRPC header when security is enabled.
- * */
+ */
@Override
public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
@@ -132,11 +131,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
}
// Add credential context to the client call
- String userName = UserGroupInformation.getCurrentUser()
- .getShortUserName();
+ String userName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
- NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
- .getIpAddress(), port).usePlaintext()
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
@@ -149,8 +147,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
if (trustCertCollectionFile != null) {
sslContextBuilder.trustManager(trustCertCollectionFile);
}
- if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null &&
- privateKeyFile != null) {
+ if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null
+ && privateKeyFile != null) {
sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile);
}
@@ -216,49 +214,45 @@ public class XceiverClientGrpc extends XceiverClientSpi {
}
@Override
- public XceiverClientReply sendCommand(
- ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
+ public ContainerCommandResponseProto sendCommand(
+ ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
- Preconditions.checkState(HddsUtils.isReadOnly(request));
- return sendCommandWithTraceIDAndRetry(request, excludeDns);
+ try {
+ XceiverClientReply reply;
+ reply = sendCommandWithTraceIDAndRetry(request, validators);
+ ContainerCommandResponseProto responseProto = reply.getResponse().get();
+ return responseProto;
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IOException("Failed to execute command " + request, e);
+ }
}
private XceiverClientReply sendCommandWithTraceIDAndRetry(
- ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
+ ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
- .setTraceID(TracingUtil.exportCurrentSpan())
- .build();
- return sendCommandWithRetry(finalPayload, excludeDns);
+ .setTraceID(TracingUtil.exportCurrentSpan()).build();
+ return sendCommandWithRetry(finalPayload, validators);
}
}
private XceiverClientReply sendCommandWithRetry(
- ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
+ ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
+ IOException ioException = null;
// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round robin fashion.
// TODO: cache the correct leader info in here, so that any subsequent
calls
// should first go to leader
- List<DatanodeDetails> dns = pipeline.getNodes();
- List<DatanodeDetails> healthyDns =
- excludeDns != null ? dns.stream().filter(dnId -> {
- for (DatanodeDetails excludeId : excludeDns) {
- if (dnId.equals(excludeId)) {
- return false;
- }
- }
- return true;
- }).collect(Collectors.toList()) : dns;
XceiverClientReply reply = new XceiverClientReply(null);
- for (DatanodeDetails dn : healthyDns) {
+ for (DatanodeDetails dn : pipeline.getNodes()) {
try {
LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode,
@@ -266,17 +260,26 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
- if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
- break;
+ if (validators != null && !validators.isEmpty()) {
+ for (CheckedBiFunction validator : validators) {
+ validator.apply(request, responseProto);
+ }
}
- } catch (ExecutionException | InterruptedException e) {
+ break;
+ } catch (ExecutionException | InterruptedException | IOException e) {
LOG.debug("Failed to execute command " + request + " on datanode " + dn
.getUuidString(), e);
- if (Status.fromThrowable(e.getCause()).getCode()
- == Status.UNAUTHENTICATED.getCode()) {
- throw new SCMSecurityException("Failed to authenticate with "
- + "GRPC XceiverServer with Ozone block token.");
+ if (!(e instanceof IOException)) {
+ if (Status.fromThrowable(e.getCause()).getCode()
+ == Status.UNAUTHENTICATED.getCode()) {
+ throw new SCMSecurityException("Failed to authenticate with "
+ + "GRPC XceiverServer with Ozone block token.");
+ }
+ ioException = new IOException(e);
+ } else {
+ ioException = (IOException) e;
}
+ responseProto = null;
}
}
@@ -284,9 +287,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
reply.setResponse(CompletableFuture.completedFuture(responseProto));
return reply;
} else {
- throw new IOException(
- "Failed to execute command " + request + " on the pipeline "
- + pipeline.getId());
+ Preconditions.checkNotNull(ioException);
+ LOG.error("Failed to execute command " + request + " on the pipeline "
+ + pipeline.getId());
+ throw ioException;
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index bb4a5b0..82fb106 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -21,12 +21,11 @@ package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -34,16 +33,17 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+ ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+ ContainerCommandRequestProto;
import org.apache.hadoop.hdds.client.BlockID;
-
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutionException;
/**
* An {@link InputStream} used by the REST service in combination with the
@@ -74,7 +74,7 @@ public class BlockInputStream extends InputStream implements
Seekable {
private List<ByteBuffer> buffers;
private int bufferIndex;
private long bufferPosition;
- private final boolean verifyChecksum;
+ private boolean verifyChecksum;
/**
* Creates a new BlockInputStream.
@@ -323,41 +323,8 @@ public class BlockInputStream extends InputStream
implements Seekable {
private synchronized void readChunkFromContainer() throws IOException {
// Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
- List<DatanodeDetails> excludeDns = null;
ByteString byteString;
- List<DatanodeDetails> dnList = getDatanodeList();
- while (true) {
- List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
- byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
- try {
- if (byteString.size() != chunkInfo.getLen()) {
- // Bytes read from chunk should be equal to chunk size.
- throw new IOException(String
- .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
- chunkInfo.getChunkName(), chunkInfo.getLen(),
- byteString.size()));
- }
- ChecksumData checksumData =
- ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
- if (verifyChecksum) {
- Checksum.verifyChecksum(byteString, checksumData);
- }
- break;
- } catch (IOException ioe) {
- // we will end up in this situation only if the checksum mismatch
- // happens or the length of the chunk mismatches.
- // In this case, read should be retried on a different replica.
- // TODO: Inform SCM of a possible corrupt container replica here
- if (excludeDns == null) {
- excludeDns = new ArrayList<>();
- }
- excludeDns.addAll(dnListFromReadChunkCall);
- if (excludeDns.size() == dnList.size()) {
- throw ioe;
- }
- }
- }
-
+ byteString = readChunk(chunkInfo);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
chunkIndexOfCurrentBuffer = chunkIndex;
@@ -372,28 +339,20 @@ public class BlockInputStream extends InputStream
implements Seekable {
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
- protected ByteString readChunk(final ChunkInfo chunkInfo,
- List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
+ protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
- XceiverClientReply reply;
- ReadChunkResponseProto readChunkResponse = null;
+ ReadChunkResponseProto readChunkResponse;
try {
- reply = ContainerProtocolCalls
- .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
- ContainerProtos.ContainerCommandResponseProto response;
- response = reply.getResponse().get();
- ContainerProtocolCalls.validateContainerResponse(response);
- readChunkResponse = response.getReadChunk();
- dnListFromReply.addAll(reply.getDatanodes());
+ List<CheckedBiFunction> validators =
+ ContainerProtocolCalls.getValidatorList();
+ validators.add(validator);
+ readChunkResponse = ContainerProtocolCalls
+ .readChunk(xceiverClient, chunkInfo, blockID, traceID, validators);
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
- } catch (ExecutionException | InterruptedException e) {
- throw new IOException(
- "Failed to execute ReadChunk command for chunk " + chunkInfo
- .getChunkName(), e);
}
return readChunkResponse.getData();
}
@@ -403,6 +362,26 @@ public class BlockInputStream extends InputStream
implements Seekable {
return xceiverClient.getPipeline().getNodes();
}
+ private CheckedBiFunction<ContainerCommandRequestProto,
+ ContainerCommandResponseProto, IOException> validator =
+ (request, response) -> {
+ ReadChunkResponseProto readChunkResponse = response.getReadChunk();
+ final ChunkInfo chunkInfo = readChunkResponse.getChunkData();
+ ByteString byteString = readChunkResponse.getData();
+ if (byteString.size() != chunkInfo.getLen()) {
+ // Bytes read from chunk should be equal to chunk size.
+ throw new OzoneChecksumException(String
+ .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+ chunkInfo.getChunkName(), chunkInfo.getLen(),
+ byteString.size()));
+ }
+ ChecksumData checksumData =
+ ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
+ if (verifyChecksum) {
+ Checksum.verifyChecksum(byteString, checksumData);
+ }
+ };
+
@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || (chunks.size() == 0 && pos > 0)
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 35c1022..b6ceb2b 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -114,8 +114,7 @@ public class TestBlockInputStream {
}
@Override
- protected ByteString readChunk(final ChunkInfo chunkInfo,
- List<DatanodeDetails> excludeDns, List<DatanodeDetails>
dnListFromReply)
+ protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 1a18366..5631bad 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -25,13 +25,13 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
/**
* A Client for the storageContainer protocol.
@@ -118,18 +118,21 @@ public abstract class XceiverClientSpi implements
Closeable {
* Sends a given command to server and gets the reply back along with
* the server associated info.
* @param request Request
- * @param excludeDns list of servers on which the command won't be sent to.
+ * @param validators functions to validate the response
* @return Response to the command
* @throws IOException
*/
- public XceiverClientReply sendCommand(
- ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
+ public ContainerCommandResponseProto sendCommand(
+ ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
try {
XceiverClientReply reply;
reply = sendCommandAsync(request);
- reply.getResponse().get();
- return reply;
+ ContainerCommandResponseProto responseProto = reply.getResponse().get();
+ for (CheckedBiFunction function : validators) {
+ function.apply(request, responseProto);
+ }
+ return responseProto;
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Failed to command " + request, e);
}
@@ -156,7 +159,7 @@ public abstract class XceiverClientSpi implements Closeable
{
/**
* Check if an specfic commitIndex is replicated to majority/all servers.
* @param index index to watch for
- * @param timeout timeout provided for the watch ipeartion to complete
+ * @param timeout timeout provided for the watch operation to complete
* @return reply containing the min commit index replicated to all or
majority
* servers in case of a failure
* @throws InterruptedException
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java
new file mode 100644
index 0000000..df84859
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+
+import java.io.IOException;
+
+/**
+ * Defines a functional interface having two inputs which throws IOException.
+ */
+@FunctionalInterface
+public interface CheckedBiFunction<LEFT, RIGHT,
+ THROWABLE extends IOException> {
+ void apply(LEFT left, RIGHT right) throws THROWABLE;
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 5a1a75e..08f5d87 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.storage;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException;
@@ -72,6 +71,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -116,9 +116,8 @@ public final class ContainerProtocolCalls {
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response =
xceiverClient.sendCommand(request);
- validateContainerResponse(response);
-
+ ContainerCommandResponseProto response =
+ xceiverClient.sendCommand(request, getValidatorList());
return response.getGetBlock();
}
@@ -153,8 +152,8 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response =
xceiverClient.sendCommand(request);
- validateContainerResponse(response);
+ ContainerCommandResponseProto response =
+ xceiverClient.sendCommand(request, getValidatorList());
return response.getGetCommittedBlockLength();
}
@@ -184,8 +183,8 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response =
xceiverClient.sendCommand(request);
- validateContainerResponse(response);
+ ContainerCommandResponseProto response =
+ xceiverClient.sendCommand(request, getValidatorList());
return response.getPutBlock();
}
@@ -228,35 +227,31 @@ public final class ContainerProtocolCalls {
* @param chunk information about chunk to read
* @param blockID ID of the block
* @param traceID container protocol call args
- * @param excludeDns datamode to exclude while executing the command
+ * @param validators functions to validate the response
* @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
- public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient,
- ChunkInfo chunk, BlockID blockID, String traceID,
- List<DatanodeDetails> excludeDns)
- throws IOException {
- ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
- .newBuilder()
- .setBlockID(blockID.getDatanodeBlockIDProtobuf())
- .setChunkData(chunk);
+ public static ContainerProtos.ReadChunkResponseProto readChunk(
+ XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
+ String traceID, List<CheckedBiFunction> validators) throws IOException {
+ ReadChunkRequestProto.Builder readChunkRequest =
+ ReadChunkRequestProto.newBuilder()
+ .setBlockID(blockID.getDatanodeBlockIDProtobuf())
+ .setChunkData(chunk);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
- ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
- .newBuilder()
- .setCmdType(Type.ReadChunk)
- .setContainerID(blockID.getContainerID())
- .setTraceID(traceID)
- .setDatanodeUuid(id)
- .setReadChunk(readChunkRequest);
+ ContainerCommandRequestProto.Builder builder =
+ ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
+ .setContainerID(blockID.getContainerID()).setTraceID(traceID)
+ .setDatanodeUuid(id).setReadChunk(readChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- XceiverClientReply reply =
- xceiverClient.sendCommand(request, excludeDns);
- return reply;
+ ContainerCommandResponseProto reply =
+ xceiverClient.sendCommand(request, validators);
+ return reply.getReadChunk();
}
/**
@@ -291,8 +286,7 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response =
xceiverClient.sendCommand(request);
- validateContainerResponse(response);
+ xceiverClient.sendCommand(request, getValidatorList());
}
/**
@@ -384,8 +378,8 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response = client.sendCommand(request);
- validateContainerResponse(response);
+ ContainerCommandResponseProto response =
+ client.sendCommand(request, getValidatorList());
return response.getPutSmallFile();
}
@@ -416,9 +410,7 @@ public final class ContainerProtocolCalls {
request.setCreateContainer(createRequest.build());
request.setDatanodeUuid(id);
request.setTraceID(traceID);
- ContainerCommandResponseProto response = client.sendCommand(
- request.build());
- validateContainerResponse(response);
+ client.sendCommand(request.build(), getValidatorList());
}
/**
@@ -444,12 +436,10 @@ public final class ContainerProtocolCalls {
request.setDeleteContainer(deleteRequest);
request.setTraceID(traceID);
request.setDatanodeUuid(id);
- if(encodedToken != null) {
+ if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
- ContainerCommandResponseProto response =
- client.sendCommand(request.build());
- validateContainerResponse(response);
+ client.sendCommand(request.build(), getValidatorList());
}
/**
@@ -476,9 +466,7 @@ public final class ContainerProtocolCalls {
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
- ContainerCommandResponseProto response =
- client.sendCommand(request.build());
- validateContainerResponse(response);
+ client.sendCommand(request.build(), getValidatorList());
}
/**
@@ -505,8 +493,7 @@ public final class ContainerProtocolCalls {
request.setEncodedToken(encodedToken);
}
ContainerCommandResponseProto response =
- client.sendCommand(request.build());
- validateContainerResponse(response);
+ client.sendCommand(request.build(), getValidatorList());
return response.getReadContainer();
}
@@ -544,9 +531,8 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response = client.sendCommand(request);
- validateContainerResponse(response);
-
+ ContainerCommandResponseProto response =
+ client.sendCommand(request, getValidatorList());
return response.getGetSmallFile();
}
@@ -598,4 +584,13 @@ public final class ContainerProtocolCalls {
.append(blockId.getLocalID())
.toString());
}
+
+ public static List<CheckedBiFunction> getValidatorList() {
+ List<CheckedBiFunction> validators = new ArrayList<>(1);
+ CheckedBiFunction<ContainerProtos.ContainerCommandRequestProto,
+ ContainerProtos.ContainerCommandResponseProto, IOException>
+ validator = (request, response) -> validateContainerResponse(response);
+ validators.add(validator);
+ return validators;
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index e6224ab..3810270 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -871,7 +871,7 @@ public abstract class TestOzoneRpcClientAbstract {
fail("Reading corrupted data should fail, as verify checksum is " +
"enabled");
}
- } catch (OzoneChecksumException e) {
+ } catch (IOException e) {
if (!verifyChecksum) {
fail("Reading corrupted data should not fail, as verify checksum is " +
"disabled");
@@ -1022,7 +1022,7 @@ public abstract class TestOzoneRpcClientAbstract {
OzoneInputStream is = bucket.readKey(keyName);
is.read(new byte[100]);
fail("Reading corrupted data should fail.");
- } catch (OzoneChecksumException e) {
+ } catch (IOException e) {
GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
}
}
@@ -1103,7 +1103,7 @@ public abstract class TestOzoneRpcClientAbstract {
byte[] b = new byte[data.length];
is.read(b);
fail("Reading corrupted data should fail.");
- } catch (OzoneChecksumException e) {
+ } catch (IOException e) {
GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
index 0145c6d..1343a03 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
@@ -204,9 +204,8 @@ public class TestReadRetries {
readKey(bucket, keyName, value);
fail("Expected exception not thrown");
} catch (IOException e) {
- Assert.assertTrue(e.getMessage().contains("Failed to execute command"));
- Assert.assertTrue(
- e.getMessage().contains("on the pipeline " + pipeline.getId()));
+ // it should throw an ioException as none of the servers
+ // are available
}
manager.releaseClient(clientSpi, false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]