Repository: hadoop
Updated Branches:
  refs/heads/trunk 085f10e75 -> 64a4b6b08


HDDS-284. Checksum for ChunksData.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/64a4b6b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/64a4b6b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/64a4b6b0

Branch: refs/heads/trunk
Commit: 64a4b6b08baf68c47cb17079bad2190f86166f50
Parents: 085f10e
Author: Hanisha Koneru <hanishakon...@apache.org>
Authored: Wed Nov 28 10:53:12 2018 -0800
Committer: Hanisha Koneru <hanishakon...@apache.org>
Committed: Wed Nov 28 10:53:12 2018 -0800

----------------------------------------------------------------------
 .../hdds/scm/storage/ChunkInputStream.java      |  11 +
 .../hdds/scm/storage/ChunkOutputStream.java     |  21 +-
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  14 +-
 .../apache/hadoop/ozone/common/Checksum.java    | 239 +++++++++++++++++++
 .../hadoop/ozone/common/ChecksumData.java       | 190 +++++++++++++++
 .../ozone/common/OzoneChecksumException.java    |  66 +++++
 .../container/common/helpers/ChunkInfo.java     |  28 +--
 .../main/proto/DatanodeContainerProtocol.proto  |  18 +-
 .../hadoop/ozone/common/TestChecksum.java       | 101 ++++++++
 .../container/keyvalue/helpers/ChunkUtils.java  |  43 +---
 .../keyvalue/impl/ChunkManagerImpl.java         |   4 -
 .../keyvalue/TestChunkManagerImpl.java          |  16 --
 .../ozone/client/io/ChunkGroupOutputStream.java |  30 ++-
 .../hadoop/ozone/client/rpc/RpcClient.java      |  25 ++
 .../ozone/client/rpc/TestOzoneRpcClient.java    |  98 +++++++-
 .../ozone/container/ContainerTestHelper.java    |  17 +-
 .../common/TestBlockDeletingService.java        |   3 +-
 .../common/impl/TestContainerPersistence.java   |  32 ++-
 .../web/storage/DistributedStorageHandler.java  |  25 ++
 19 files changed, 862 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 7b243d8..2e24aca 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -206,6 +210,9 @@ public class ChunkInputStream extends InputStream 
implements Seekable {
       readChunkResponse = ContainerProtocolCalls
           .readChunk(xceiverClient, chunkInfo, blockID, traceID);
     } catch (IOException e) {
+      if (e instanceof StorageContainerException) {
+        throw e;
+      }
       throw new IOException("Unexpected OzoneException: " + e.toString(), e);
     }
     ByteString byteString = readChunkResponse.getData();
@@ -215,6 +222,10 @@ public class ChunkInputStream extends InputStream 
implements Seekable {
           .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
               chunkInfo.getChunkName(), chunkInfo.getLen(), 
byteString.size()));
     }
+    ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+        chunkInfo.getChecksumData());
+    Checksum.verifyChecksum(byteString, checksumData);
+
     buffers = byteString.asReadOnlyByteBufferList();
     bufferIndex = 0;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
index bdc6a83..85f8646 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
@@ -21,6 +21,9 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -74,6 +77,7 @@ public class ChunkOutputStream extends OutputStream {
   private final BlockData.Builder containerBlockData;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
+  private final Checksum checksum;
   private final String streamId;
   private int chunkIndex;
   private int chunkSize;
@@ -113,7 +117,8 @@ public class ChunkOutputStream extends OutputStream {
   public ChunkOutputStream(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager, XceiverClientSpi 
xceiverClient,
       String traceID, int chunkSize, long streamBufferFlushSize,
-      long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) {
+      long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer,
+      Checksum checksum) {
     this.blockID = blockID;
     this.key = key;
     this.traceID = traceID;
@@ -132,6 +137,7 @@ public class ChunkOutputStream extends OutputStream {
     this.watchTimeout = watchTimeout;
     this.buffer = buffer;
     this.ioException = null;
+    this.checksum = checksum;
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
@@ -474,13 +480,20 @@ public class ChunkOutputStream extends OutputStream {
    * information to be used later in putKey call.
    *
    * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneChecksumException if there is an error while computing
+   * checksum
    */
   private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
     int effectiveChunkSize = chunk.remaining();
     ByteString data = ByteString.copyFrom(chunk);
-    ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName(
-        DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_"
-            + ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build();
+    ChecksumData checksumData = checksum.computeChecksum(data);
+    ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+        .setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +
+            "_chunk_" + ++chunkIndex)
+        .setOffset(0)
+        .setLen(effectiveChunkSize)
+        .setChecksumData(checksumData.getProtoBufMessage())
+        .build();
     // generate a unique requestId
     String requestId =
         traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8d5c180..879f773 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+ /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -323,6 +323,18 @@ public final class OzoneConfigKeys {
       "hdds.datanode.replication.work.dir";
 
   /**
+   * Config properties to set client side checksum properties.
+   */
+  public static final String OZONE_CLIENT_CHECKSUM_TYPE =
+      "ozone.client.checksum.type";
+  public static final String OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT = "SHA256";
+  public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM =
+      "ozone.client.bytes.per.checksum";
+  public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT =
+      1024 * 1024; // 1 MB
+  public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 
1024;
+
+  /**
    * There is no need to instantiate this class.
    */
   private OzoneConfigKeys() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
new file mode 100644
index 0000000..83293e5
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Longs;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.PureJavaCrc32C;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to compute and verify checksums for chunks.
+ */
+public class Checksum {
+
+  public static final Logger LOG = LoggerFactory.getLogger(Checksum.class);
+
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
+
+  private PureJavaCrc32 crc32Checksum;
+  private PureJavaCrc32C crc32cChecksum;
+  private MessageDigest sha;
+
+  /**
+   * Constructs a Checksum object.
+   * @param type type of Checksum
+   * @param bytesPerChecksum number of bytes of data per checksum
+   */
+  public Checksum(ChecksumType type, int bytesPerChecksum) {
+    this.checksumType = type;
+    this.bytesPerChecksum = bytesPerChecksum;
+  }
+
+  /**
+   * Constructs a Checksum object with default ChecksumType and default
+   * BytesPerChecksum.
+   */
+  @VisibleForTesting
+  public Checksum() {
+    this.checksumType = ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    this.bytesPerChecksum = OzoneConfigKeys
+        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT;
+  }
+
+  /**
+   * Computes checksum for give data.
+   * @param byteString input data in the form of ByteString.
+   * @return ChecksumData computed for input data.
+   */
+  public ChecksumData computeChecksum(ByteString byteString)
+      throws OzoneChecksumException {
+    return computeChecksum(byteString.toByteArray());
+  }
+
+  /**
+   * Computes checksum for give data.
+   * @param data input data in the form of byte array.
+   * @return ChecksumData computed for input data.
+   */
+  public ChecksumData computeChecksum(byte[] data)
+      throws OzoneChecksumException {
+    ChecksumData checksumData = new ChecksumData(this.checksumType, this
+        .bytesPerChecksum);
+    if (checksumType == ChecksumType.NONE) {
+      // Since type is set to NONE, we do not need to compute the checksums
+      return checksumData;
+    }
+
+    switch (checksumType) {
+    case CRC32:
+      crc32Checksum = new PureJavaCrc32();
+      break;
+    case CRC32C:
+      crc32cChecksum = new PureJavaCrc32C();
+      break;
+    case SHA256:
+      try {
+        sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+      } catch (NoSuchAlgorithmException e) {
+        throw new OzoneChecksumException(OzoneConsts.FILE_HASH, e);
+      }
+      break;
+    case MD5:
+      break;
+    default:
+      throw new OzoneChecksumException(checksumType);
+    }
+
+    // Compute number of checksums needs for given data length based on bytes
+    // per checksum.
+    int dataSize = data.length;
+    int numChecksums = (dataSize + bytesPerChecksum - 1) / bytesPerChecksum;
+
+    // Checksum is computed for each bytesPerChecksum number of bytes of data
+    // starting at offset 0. The last checksum might be computed for the
+    // remaining data with length less than bytesPerChecksum.
+    List<ByteString> checksumList = new ArrayList<>(numChecksums);
+    for (int index = 0; index < numChecksums; index++) {
+      checksumList.add(computeChecksumAtIndex(data, index));
+    }
+    checksumData.setChecksums(checksumList);
+
+    return checksumData;
+  }
+
+  /**
+   * Computes checksum based on checksumType for a data block at given index
+   * and a max length of bytesPerChecksum.
+   * @param data input data
+   * @param index index to compute the offset from where data must be read
+   * @return computed checksum ByteString
+   * @throws OzoneChecksumException thrown when ChecksumType is not recognized
+   */
+  private ByteString computeChecksumAtIndex(byte[] data, int index)
+      throws OzoneChecksumException {
+    int offset = index * bytesPerChecksum;
+    int len = bytesPerChecksum;
+    if ((offset + len) > data.length) {
+      len = data.length - offset;
+    }
+    byte[] checksumBytes = null;
+    switch (checksumType) {
+    case CRC32:
+      checksumBytes = computeCRC32Checksum(data, offset, len);
+      break;
+    case CRC32C:
+      checksumBytes = computeCRC32CChecksum(data, offset, len);
+      break;
+    case SHA256:
+      checksumBytes = computeSHA256Checksum(data, offset, len);
+      break;
+    case MD5:
+      checksumBytes = computeMD5Checksum(data, offset, len);
+      break;
+    default:
+      throw new OzoneChecksumException(checksumType);
+    }
+
+    return ByteString.copyFrom(checksumBytes);
+  }
+
+  /**
+   * Computes CRC32 checksum.
+   */
+  private byte[] computeCRC32Checksum(byte[] data, int offset, int len) {
+    crc32Checksum.reset();
+    crc32Checksum.update(data, offset, len);
+    return Longs.toByteArray(crc32Checksum.getValue());
+  }
+
+  /**
+   * Computes CRC32C checksum.
+   */
+  private byte[] computeCRC32CChecksum(byte[] data, int offset, int len) {
+    crc32cChecksum.reset();
+    crc32cChecksum.update(data, offset, len);
+    return Longs.toByteArray(crc32cChecksum.getValue());
+  }
+
+  /**
+   * Computes SHA-256 checksum.
+   */
+  private byte[] computeSHA256Checksum(byte[] data, int offset, int len) {
+    sha.reset();
+    sha.update(data, offset, len);
+    return sha.digest();
+  }
+
+  /**
+   * Computes MD5 checksum.
+   */
+  private byte[] computeMD5Checksum(byte[] data, int offset, int len) {
+    MD5Hash md5out = MD5Hash.digest(data, offset, len);
+    return md5out.getDigest();
+  }
+
+  /**
+   * Computes the ChecksumData for the input data and verifies that it
+   * matches with that of the input checksumData.
+   * @param byteString input data
+   * @param checksumData checksumData to match with
+   * @throws OzoneChecksumException is thrown if checksums do not match
+   */
+  public static boolean verifyChecksum(
+      ByteString byteString, ChecksumData checksumData)
+      throws OzoneChecksumException {
+    return verifyChecksum(byteString.toByteArray(), checksumData);
+  }
+
+  /**
+   * Computes the ChecksumData for the input data and verifies that it
+   * matches with that of the input checksumData.
+   * @param data input data
+   * @param checksumData checksumData to match with
+   * @throws OzoneChecksumException is thrown if checksums do not match
+   */
+  public static boolean verifyChecksum(byte[] data, ChecksumData checksumData)
+      throws OzoneChecksumException {
+    ChecksumType checksumType = checksumData.getChecksumType();
+    if (checksumType == ChecksumType.NONE) {
+      // Checksum is set to NONE. No further verification is required.
+      return true;
+    }
+
+    int bytesPerChecksum = checksumData.getBytesPerChecksum();
+    Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
+    ChecksumData computedChecksumData = checksum.computeChecksum(data);
+
+    return checksumData.verifyChecksumDataMatches(computedChecksumData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
new file mode 100644
index 0000000..dafa0e3
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+/**
+ * Java class that represents Checksum ProtoBuf class. This helper class allows
+ * us to convert to and from protobuf to normal java.
+ */
+public class ChecksumData {
+
+  private ChecksumType type;
+  // Checksum will be computed for every bytesPerChecksum number of bytes and
+  // stored sequentially in checksumList
+  private int bytesPerChecksum;
+  private List<ByteString> checksums;
+
+  public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) {
+    this.type = checksumType;
+    this.bytesPerChecksum = bytesPerChecksum;
+    this.checksums = Lists.newArrayList();
+  }
+
+  /**
+   * Getter method for checksumType.
+   */
+  public ChecksumType getChecksumType() {
+    return this.type;
+  }
+
+  /**
+   * Getter method for bytesPerChecksum.
+   */
+  public int getBytesPerChecksum() {
+    return this.bytesPerChecksum;
+  }
+
+  /**
+   * Getter method for checksums.
+   */
+  @VisibleForTesting
+  public List<ByteString> getChecksums() {
+    return this.checksums;
+  }
+
+  /**
+   * Setter method for checksums.
+   * @param checksumList list of checksums
+   */
+  public void setChecksums(List<ByteString> checksumList) {
+    this.checksums.clear();
+    this.checksums.addAll(checksumList);
+  }
+
+  /**
+   * Construct the Checksum ProtoBuf message.
+   * @return Checksum ProtoBuf message
+   */
+  public ContainerProtos.ChecksumData getProtoBufMessage() {
+    ContainerProtos.ChecksumData.Builder checksumProtoBuilder =
+        ContainerProtos.ChecksumData.newBuilder()
+            .setType(this.type)
+            .setBytesPerChecksum(this.bytesPerChecksum);
+
+    checksumProtoBuilder.addAllChecksums(checksums);
+
+    return checksumProtoBuilder.build();
+  }
+
+  /**
+   * Constructs Checksum class object from the Checksum ProtoBuf message.
+   * @param checksumDataProto Checksum ProtoBuf message
+   * @return ChecksumData object representing the proto
+   */
+  public static ChecksumData getFromProtoBuf(
+      ContainerProtos.ChecksumData checksumDataProto) {
+    Preconditions.checkNotNull(checksumDataProto);
+
+    ChecksumData checksumData = new ChecksumData(
+        checksumDataProto.getType(), checksumDataProto.getBytesPerChecksum());
+
+    if (checksumDataProto.getChecksumsCount() != 0) {
+      checksumData.setChecksums(checksumDataProto.getChecksumsList());
+    }
+
+    return checksumData;
+  }
+
+  /**
+   * Verify that this ChecksumData matches with the input ChecksumData.
+   * @param that the ChecksumData to match with
+   * @return true if checksums match
+   * @throws OzoneChecksumException
+   */
+  public boolean verifyChecksumDataMatches(ChecksumData that) throws
+      OzoneChecksumException {
+
+    // pre checks
+    if (this.checksums.size() == 0) {
+      throw new OzoneChecksumException("Original checksumData has no " +
+          "checksums");
+    }
+
+    if (that.checksums.size() == 0) {
+      throw new OzoneChecksumException("Computed checksumData has no " +
+          "checksums");
+    }
+
+    if (this.checksums.size() != that.checksums.size()) {
+      throw new OzoneChecksumException("Original and Computed checksumData's " 
+
+          "has different number of checksums");
+    }
+
+    // Verify that checksum matches at each index
+    for (int index = 0; index < this.checksums.size(); index++) {
+      if (!matchChecksumAtIndex(this.checksums.get(index),
+          that.checksums.get(index))) {
+        // checksum mismatch. throw exception.
+        throw new OzoneChecksumException(index);
+      }
+    }
+    return true;
+  }
+
+  private static boolean matchChecksumAtIndex(
+      ByteString expectedChecksumAtIndex, ByteString computedChecksumAtIndex) {
+    return expectedChecksumAtIndex.equals(computedChecksumAtIndex);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof ChecksumData)) {
+      return false;
+    }
+
+    ChecksumData that = (ChecksumData) obj;
+
+    if (!this.type.equals(that.getChecksumType())) {
+      return false;
+    }
+    if (this.bytesPerChecksum != that.getBytesPerChecksum()) {
+      return false;
+    }
+    if (this.checksums.size() != that.checksums.size()) {
+      return false;
+    }
+
+    // Match checksum at each index
+    for (int index = 0; index < this.checksums.size(); index++) {
+      if (!matchChecksumAtIndex(this.checksums.get(index),
+          that.checksums.get(index))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hc = new HashCodeBuilder();
+    hc.append(type);
+    hc.append(bytesPerChecksum);
+    hc.append(checksums.toArray());
+    return hc.toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java
new file mode 100644
index 0000000..20e40af
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+
+/** Thrown for checksum errors. */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class OzoneChecksumException extends IOException {
+
+  /**
+   * OzoneChecksumException to throw when checksum verfication fails.
+   * @param index checksum list index at which checksum match failed
+   */
+  public OzoneChecksumException(int index) {
+    super(String.format("Checksum mismatch at index %d", index));
+  }
+
+  /**
+   * OzoneChecksumException to throw when unrecognized checksumType is given.
+   * @param unrecognizedChecksumType
+   */
+  public OzoneChecksumException(
+      ContainerProtos.ChecksumType unrecognizedChecksumType) {
+    super(String.format("Unrecognized ChecksumType: %s",
+        unrecognizedChecksumType));
+  }
+
+  /**
+   * OzoneChecksumException to wrap around NoSuchAlgorithmException.
+   * @param algorithm name of algorithm
+   * @param ex original exception thrown
+   */
+  public OzoneChecksumException(
+      String algorithm, NoSuchAlgorithmException ex) {
+    super(String.format("NoSuchAlgorithmException thrown while computing " +
+        "SHA-256 checksum using algorithm %s", algorithm), ex);
+  }
+
+  /**
+   * OzoneChecksumException to throw with custom message.
+   */
+  public OzoneChecksumException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
index 21916b5..d75f10f 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
+import org.apache.hadoop.ozone.common.ChecksumData;
 
 /**
  * Java class that represents ChunkInfo ProtoBuf class. This helper class 
allows
@@ -33,7 +34,7 @@ public class ChunkInfo {
   private final String chunkName;
   private final long offset;
   private final long len;
-  private String checksum;
+  private ChecksumData checksumData;
   private final Map<String, String> metadata;
 
 
@@ -86,10 +87,9 @@ public class ChunkInfo {
           info.getMetadata(x).getValue());
     }
 
+    chunkInfo.setChecksumData(
+        ChecksumData.getFromProtoBuf(info.getChecksumData()));
 
-    if (info.hasChecksum()) {
-      chunkInfo.setChecksum(info.getChecksum());
-    }
     return chunkInfo;
   }
 
@@ -105,9 +105,7 @@ public class ChunkInfo {
     builder.setChunkName(this.getChunkName());
     builder.setOffset(this.getOffset());
     builder.setLen(this.getLen());
-    if (this.getChecksum() != null && !this.getChecksum().isEmpty()) {
-      builder.setChecksum(this.getChecksum());
-    }
+    builder.setChecksumData(this.checksumData.getProtoBufMessage());
 
     for (Map.Entry<String, String> entry : metadata.entrySet()) {
       ContainerProtos.KeyValue.Builder keyValBuilder =
@@ -147,21 +145,17 @@ public class ChunkInfo {
   }
 
   /**
-   * Returns the SHA256 value of this chunk.
-   *
-   * @return - Hash String
+   * Returns the checksumData of this chunk.
    */
-  public String getChecksum() {
-    return checksum;
+  public ChecksumData getChecksumData() {
+    return checksumData;
   }
 
   /**
-   * Sets the Hash value of this chunk.
-   *
-   * @param checksum - Hash String.
+   * Sets the checksums of this chunk.
    */
-  public void setChecksum(String checksum) {
-    this.checksum = checksum;
+  public void setChecksumData(ChecksumData cData) {
+    this.checksumData = cData;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 9e94dd1..3695b6b 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -355,8 +355,22 @@ message ChunkInfo {
   required string chunkName = 1;
   required uint64 offset = 2;
   required uint64 len = 3;
-  optional string checksum = 4;
-  repeated KeyValue metadata = 5;
+  repeated KeyValue metadata = 4;
+  required ChecksumData checksumData =5;
+}
+
+message ChecksumData {
+  required ChecksumType type = 1;
+  required uint32 bytesPerChecksum = 2;
+  repeated bytes checksums = 3;
+}
+
+enum ChecksumType {
+    NONE = 1;
+    CRC32 = 2;
+    CRC32C = 3;
+    SHA256 = 4;
+    MD5 = 5;
 }
 
 enum Stage {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
new file mode 100644
index 0000000..819c29f
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link Checksum} class.
+ */
+public class TestChecksum {
+
+  private static final int BYTES_PER_CHECKSUM = 10;
+  private static final ContainerProtos.ChecksumType CHECKSUM_TYPE_DEFAULT =
+      ContainerProtos.ChecksumType.SHA256;
+
+  private Checksum getChecksum(ContainerProtos.ChecksumType type) {
+    if (type == null) {
+      type = CHECKSUM_TYPE_DEFAULT;
+    }
+    return new Checksum(type, BYTES_PER_CHECKSUM);
+  }
+
+  /**
+   * Tests {@link Checksum#verifyChecksum(byte[], ChecksumData)}.
+   */
+  @Test
+  public void testVerifyChecksum() throws Exception {
+    Checksum checksum = getChecksum(null);
+    int dataLen = 55;
+    byte[] data = RandomStringUtils.randomAlphabetic(dataLen).getBytes();
+
+    ChecksumData checksumData = checksum.computeChecksum(data);
+
+    // A checksum is calculate for each bytesPerChecksum number of bytes in
+    // the data. Since that value is 10 here and the data length is 55, we
+    // should have 6 checksums in checksumData.
+    Assert.assertEquals(6, checksumData.getChecksums().size());
+
+    // Checksum verification should pass
+    Assert.assertTrue("Checksum mismatch",
+        Checksum.verifyChecksum(data, checksumData));
+  }
+
+  /**
+   * Tests that if data is modified, then the checksums should not match.
+   */
+  @Test
+  public void testIncorrectChecksum() throws Exception {
+    Checksum checksum = getChecksum(null);
+    byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes();
+    ChecksumData originalChecksumData = checksum.computeChecksum(data);
+
+    // Change the data and check if new checksum matches the original checksum.
+    // Modifying one byte of data should be enough for the checksum data to
+    // mismatch
+    data[50] = (byte) (data[50]+1);
+    ChecksumData newChecksumData = checksum.computeChecksum(data);
+    Assert.assertNotEquals("Checksums should not match for different data",
+        originalChecksumData, newChecksumData);
+  }
+
+  /**
+   * Tests that checksum calculated using two different checksumTypes should
+   * not match.
+   */
+  @Test
+  public void testChecksumMismatchForDifferentChecksumTypes() throws Exception 
{
+    byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes();
+
+    // Checksum1 of type SHA-256
+    Checksum checksum1 = getChecksum(null);
+    ChecksumData checksumData1 = checksum1.computeChecksum(data);
+
+    // Checksum2 of type CRC32
+    Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32);
+    ChecksumData checksumData2 = checksum2.computeChecksum(data);
+
+    // The two checksums should not match as they have different types
+    Assert.assertNotEquals(
+        "Checksums should not match for different checksum types",
+        checksum1, checksum2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 2ee82cb..7239843 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -47,7 +45,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousFileChannel;
 import java.nio.channels.FileLock;
 import java.nio.file.StandardOpenOption;
-import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.ExecutionException;
 
@@ -90,11 +87,6 @@ public final class ChunkUtils {
     FileLock lock = null;
 
     try {
-      if (chunkInfo.getChecksum() != null &&
-          !chunkInfo.getChecksum().isEmpty()) {
-        verifyChecksum(chunkInfo, data, log);
-      }
-
       long writeTimeStart = Time.monotonicNow();
       file =
           AsynchronousFileChannel.open(chunkFile.toPath(),
@@ -154,10 +146,8 @@ public final class ChunkUtils {
    * @throws InterruptedException
    */
   public static ByteBuffer readData(File chunkFile, ChunkInfo data,
-                                    VolumeIOStats volumeIOStats)
-      throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
+      VolumeIOStats volumeIOStats) throws StorageContainerException,
+      ExecutionException, InterruptedException {
     Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
 
     if (!chunkFile.exists()) {
@@ -184,10 +174,7 @@ public final class ChunkUtils {
       volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
       volumeIOStats.incReadOpCount();
       volumeIOStats.incReadBytes(data.getLen());
-      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
-        buf.rewind();
-        verifyChecksum(data, buf, log);
-      }
+
       return buf;
     } catch (IOException e) {
       throw new StorageContainerException(e, IO_EXCEPTION);
@@ -206,30 +193,6 @@ public final class ChunkUtils {
   }
 
   /**
-   * Verifies the checksum of a chunk against the data buffer.
-   *
-   * @param chunkInfo - Chunk Info.
-   * @param data - data buffer
-   * @param log - log
-   * @throws NoSuchAlgorithmException
-   * @throws StorageContainerException
-   */
-  private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data,
-      Logger log) throws NoSuchAlgorithmException, StorageContainerException {
-    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha.update(data);
-    data.rewind();
-    if (!Hex.encodeHexString(sha.digest()).equals(
-        chunkInfo.getChecksum())) {
-      log.error("Checksum mismatch. Provided: {} , computed: {}",
-          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
-      throw new StorageContainerException("Checksum mismatch. Provided: " +
-          chunkInfo.getChecksum() + " , computed: " +
-          DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
-    }
-  }
-
-  /**
    * Validates chunk data and returns a file object to Chunk File that we are
    * expected to write data to.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
index cdd19df..a1c3d01 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
@@ -200,10 +200,6 @@ public class ChunkManagerImpl implements ChunkManager {
         containerData.incrReadBytes(length);
         return data.array();
       }
-    } catch(NoSuchAlgorithmException ex) {
-      LOG.error("read data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ",
-          ex, NO_SUCH_ALGORITHM);
     } catch (ExecutionException ex) {
       LOG.error("read data failed. error: {}", ex);
       throw new StorageContainerException("Internal error: ",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
index abb9059..717dd05 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -222,22 +222,6 @@ public class TestChunkManagerImpl {
   }
 
   @Test
-  public void testWriteChunkChecksumMismatch() throws Exception {
-    try {
-      chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
-          .getLocalID(), 0), 0, data.length);
-      //Setting checksum to some value.
-      chunkInfo.setChecksum("some garbage");
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
-      fail("testWriteChunkChecksumMismatch failed");
-    } catch (StorageContainerException ex) {
-      GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
-      assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult());
-    }
-  }
-
-  @Test
   public void testReadChunkFileNotExists() throws Exception {
     try {
       // trying to read a chunk, where chunk file does not exist

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 79b0fe7..4d76395 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.client.BlockID;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -83,6 +84,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final long watchTimeout;
   private final long blockSize;
   private ByteBuffer buffer;
+  private final Checksum checksum;
   /**
    * A constructor for testing purpose only.
    */
@@ -102,6 +104,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     buffer = ByteBuffer.allocate(1);
     watchTimeout = 0;
     blockSize = 0;
+    this.checksum = new Checksum();
   }
 
   /**
@@ -113,7 +116,8 @@ public class ChunkGroupOutputStream extends OutputStream {
    */
   @VisibleForTesting
   public void addStream(OutputStream outputStream, long length) {
-    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
+    streamEntries.add(
+        new ChunkOutputStreamEntry(outputStream, length, checksum));
   }
 
   @VisibleForTesting
@@ -145,7 +149,8 @@ public class ChunkGroupOutputStream extends OutputStream {
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
       OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
       String requestId, ReplicationFactor factor, ReplicationType type,
-      long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) {
+      long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
+      Checksum checksum) {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.omClient = omClient;
@@ -163,6 +168,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.streamBufferMaxSize = bufferMaxSize;
     this.blockSize = size;
     this.watchTimeout = watchTimeout;
+    this.checksum = checksum;
 
     Preconditions.checkState(chunkSize > 0);
     Preconditions.checkState(streamBufferFlushSize > 0);
@@ -216,7 +222,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
         chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
-        streamBufferMaxSize, watchTimeout, buffer));
+        streamBufferMaxSize, watchTimeout, buffer, checksum));
   }
 
   @VisibleForTesting
@@ -534,6 +540,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     private long streamBufferMaxSize;
     private long blockSize;
     private long watchTimeout;
+    private Checksum checksum;
 
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
@@ -597,10 +604,15 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder setChecksum(Checksum checksumObj){
+      this.checksum = checksumObj;
+      return this;
+    }
+
     public ChunkGroupOutputStream build() throws IOException {
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
-          streamBufferMaxSize, blockSize, watchTimeout);
+          streamBufferMaxSize, blockSize, watchTimeout, checksum);
     }
   }
 
@@ -610,6 +622,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     private final String key;
     private final XceiverClientManager xceiverClientManager;
     private final XceiverClientSpi xceiverClient;
+    private final Checksum checksum;
     private final String requestId;
     private final int chunkSize;
     // total number of bytes that should be written to this stream
@@ -626,7 +639,7 @@ public class ChunkGroupOutputStream extends OutputStream {
         XceiverClientManager xceiverClientManager,
         XceiverClientSpi xceiverClient, String requestId, int chunkSize,
         long length, long streamBufferFlushSize, long streamBufferMaxSize,
-        long watchTimeout, ByteBuffer buffer) {
+        long watchTimeout, ByteBuffer buffer, Checksum checksum) {
       this.outputStream = null;
       this.blockID = blockID;
       this.key = key;
@@ -641,6 +654,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       this.streamBufferMaxSize = streamBufferMaxSize;
       this.watchTimeout = watchTimeout;
       this.buffer = buffer;
+      this.checksum = checksum;
     }
 
     /**
@@ -648,7 +662,8 @@ public class ChunkGroupOutputStream extends OutputStream {
      * @param  outputStream a existing writable output stream
      * @param  length the length of data to write to the stream
      */
-    ChunkOutputStreamEntry(OutputStream outputStream, long length) {
+    ChunkOutputStreamEntry(OutputStream outputStream, long length,
+        Checksum checksum) {
       this.outputStream = outputStream;
       this.blockID = null;
       this.key = null;
@@ -663,6 +678,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       streamBufferMaxSize = 0;
       buffer = null;
       watchTimeout = 0;
+      this.checksum = checksum;
     }
 
     long getLength() {
@@ -678,7 +694,7 @@ public class ChunkGroupOutputStream extends OutputStream {
         this.outputStream =
             new ChunkOutputStream(blockID, key, xceiverClientManager,
                 xceiverClient, requestId, chunkSize, streamBufferFlushSize,
-                streamBufferMaxSize, watchTimeout, buffer);
+                streamBufferMaxSize, watchTimeout, buffer, checksum);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 3a0f475..65adbfa 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Client;
@@ -42,6 +44,7 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -92,6 +95,7 @@ public class RpcClient implements ClientProtocol {
       ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
+  private final Checksum checksum;
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
@@ -166,6 +170,26 @@ public class RpcClient implements ClientProtocol {
         
conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
             OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
             TimeUnit.MILLISECONDS);
+
+    int configuredChecksumSize = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
+        OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT);
+    int checksumSize;
+    if(configuredChecksumSize <
+        OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
+      LOG.warn("The checksum size ({}) is not allowed to be less than the " +
+              "minimum size ({}), resetting to the minimum size.",
+          configuredChecksumSize,
+          OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
+      checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+    } else {
+      checksumSize = configuredChecksumSize;
+    }
+    String checksumTypeStr = conf.get(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr);
+    this.checksum = new Checksum(checksumType, checksumSize);
   }
 
   private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -489,6 +513,7 @@ public class RpcClient implements ClientProtocol {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
+            .setChecksum(checksum)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 1ed5f67..addd8ad 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.protocol.StorageType;
@@ -39,9 +40,13 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers
+    .KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -87,6 +92,8 @@ public class TestOzoneRpcClient {
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
 
+  private static final String SCM_ID = UUID.randomUUID().toString();
+
   /**
    * Create a MiniOzoneCluster for testing.
    * <p>
@@ -98,7 +105,10 @@ public class TestOzoneRpcClient {
   public static void init() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10).build();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(10)
+        .setScmId(SCM_ID)
+        .build();
     cluster.waitForClusterToBeReady();
     ozClient = OzoneClientFactory.getRpcClient(conf);
     store = ozClient.getObjectStore();
@@ -821,6 +831,92 @@ public class TestOzoneRpcClient {
     }
   }
 
+  /**
+   * Tests reading a corrputed chunk file throws checksum exception.
+   * @throws IOException
+   */
+  @Test
+  public void testReadKeyWithCorruptedData() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    String keyName = UUID.randomUUID().toString();
+
+    // Write data into a key
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes().length, ReplicationType.STAND_ALONE,
+        ReplicationFactor.ONE);
+    out.write(value.getBytes());
+    out.close();
+
+    // We need to find the location of the chunk file corresponding to the
+    // data we just wrote.
+    OzoneKey key = bucket.getKey(keyName);
+    long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+        .getContainerID();
+    long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+        .getLocalID();
+
+    // Get the container by traversing the datanodes. Atleast one of the
+    // datanode must have this container.
+    Container container = null;
+    for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) {
+      container = hddsDatanode.getDatanodeStateMachine().getContainer()
+          .getContainerSet().getContainer(containerID);
+      if (container != null) {
+        break;
+      }
+    }
+    Assert.assertNotNull("Container not found", container);
+
+    // From the containerData, get the block iterator for all the blocks in
+    // the container.
+    KeyValueContainerData containerData =
+        (KeyValueContainerData) container.getContainerData();
+    String containerPath = new File(containerData.getMetadataPath())
+        .getParent();
+    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
+        containerID, new File(containerPath));
+
+    // Find the block corresponding to the key we put. We use the localID of
+    // the BlockData to identify out key.
+    BlockData blockData = null;
+    while (keyValueBlockIterator.hasNext()) {
+      blockData = keyValueBlockIterator.nextBlock();
+      if (blockData.getBlockID().getLocalID() == localID) {
+        break;
+      }
+    }
+    Assert.assertNotNull("Block not found", blockData);
+
+    // Get the location of the chunk file
+    String chunkName = blockData.getChunks().get(0).getChunkName();
+    String containreBaseDir = container.getContainerData().getVolume()
+        .getHddsRootDir().getPath();
+    File chunksLocationPath = KeyValueContainerLocationUtil
+        .getChunksLocationPath(containreBaseDir, SCM_ID, containerID);
+    File chunkFile = new File(chunksLocationPath, chunkName);
+
+    // Corrupt the contents of the chunk file
+    String newData = new String("corrupted data");
+    FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
+
+    // Try reading the key. Since the chunk file is corrupted, it should
+    // throw a checksum mismatch exception.
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      is.read(new byte[100]);
+      Assert.fail("Reading corrupted data should fail.");
+    } catch (OzoneChecksumException e) {
+      GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
+    }
+  }
+
   @Test
   public void testDeleteKey()
       throws IOException, OzoneException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 7e9bab5..82c3ab8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container;
 
 import com.google.common.base.Preconditions;
+import java.security.MessageDigest;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -26,13 +27,15 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -42,7 +45,6 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -52,7 +54,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.ServerSocket;
-import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 
@@ -180,10 +181,9 @@ public final class ContainerTestHelper {
    * @throws NoSuchAlgorithmException
    */
   public static void setDataChecksum(ChunkInfo info, byte[] data)
-      throws NoSuchAlgorithmException {
-    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha.update(data);
-    info.setChecksum(Hex.encodeHexString(sha.digest()));
+      throws OzoneChecksumException {
+    Checksum checksum = new Checksum();
+    info.setChecksumData(checksum.computeChecksum(data));
   }
 
   /**
@@ -197,8 +197,7 @@ public final class ContainerTestHelper {
    * @throws NoSuchAlgorithmException
    */
   public static ContainerCommandRequestProto getWriteChunkRequest(
-      Pipeline pipeline, BlockID blockID, int datalen)
-      throws IOException, NoSuchAlgorithmException {
+      Pipeline pipeline, BlockID blockID, int datalen) throws IOException {
     LOG.trace("writeChunk {} (blockID={}) to pipeline=",
         datalen, blockID, pipeline);
     ContainerProtos.WriteChunkRequestProto.Builder writeRequest =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index c2fb2ea..0db047f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -140,7 +140,8 @@ public class TestBlockDeletingService {
                   .setChunkName(chunk.getAbsolutePath())
                   .setLen(0)
                   .setOffset(0)
-                  .setChecksum("")
+                  .setChecksumData(
+                      ContainerProtos.ChecksumData.getDefaultInstance())
                   .build();
           chunks.add(info);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index a48d637..fb7b0c5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -320,8 +322,7 @@ public class TestContainerPersistence {
     Assert.assertTrue(testMap.isEmpty());
   }
 
-  private ChunkInfo writeChunkHelper(BlockID blockID)
-      throws IOException, NoSuchAlgorithmException {
+  private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
     final int datalen = 1024;
     long testContainerID = blockID.getContainerID();
     Container container = containerSet.getContainer(testContainerID);
@@ -360,8 +361,7 @@ public class TestContainerPersistence {
    * @throws NoSuchAlgorithmException
    */
   @Test
-  public void testWritReadManyChunks() throws IOException,
-      NoSuchAlgorithmException {
+  public void testWritReadManyChunks() throws IOException {
     final int datalen = 1024;
     final int chunkCount = 1024;
 
@@ -386,32 +386,29 @@ public class TestContainerPersistence {
     Path dataDir = Paths.get(cNewData.getChunksPath());
 
     String globFormat = String.format("%s.data.*", blockID.getLocalID());
-    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
 
     // Read chunk via file system and verify.
     int count = 0;
     try (DirectoryStream<Path> stream =
              Files.newDirectoryStream(dataDir, globFormat)) {
+      Checksum checksum = new Checksum();
+
       for (Path fname : stream) {
-        sha.update(FileUtils.readFileToByteArray(fname.toFile()));
-        String val = Hex.encodeHexString(sha.digest());
+        ChecksumData checksumData = checksum
+            .computeChecksum(FileUtils.readFileToByteArray(fname.toFile()));
         Assert.assertEquals(fileHashMap.get(fname.getFileName().toString())
-            .getChecksum(), val);
+            .getChecksumData(), checksumData);
         count++;
-        sha.reset();
       }
       Assert.assertEquals(chunkCount, count);
 
       // Read chunk via ReadChunk call.
-      sha.reset();
       for (int x = 0; x < chunkCount; x++) {
         String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
         ChunkInfo info = fileHashMap.get(fileName);
         byte[] data = chunkManager.readChunk(container, blockID, info);
-        sha.update(data);
-        Assert.assertEquals(Hex.encodeHexString(sha.digest()),
-            info.getChecksum());
-        sha.reset();
+        ChecksumData checksumData = checksum.computeChecksum(data);
+        Assert.assertEquals(info.getChecksumData(), checksumData);
       }
     }
   }
@@ -571,7 +568,7 @@ public class TestContainerPersistence {
         getBlock(container, blockData.getBlockID());
     ChunkInfo readChunk =
         ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
-    Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
+    Assert.assertEquals(info.getChecksumData(), readChunk.getChecksumData());
   }
 
   /**
@@ -629,7 +626,7 @@ public class TestContainerPersistence {
         getBlock(container, blockData.getBlockID());
     ChunkInfo readChunk =
         ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
-    Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
+    Assert.assertEquals(info.getChecksumData(), readChunk.getChecksumData());
   }
 
   /**
@@ -684,7 +681,8 @@ public class TestContainerPersistence {
     ChunkInfo readChunk =
         ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
             .getChunks().size() - 1));
-    Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum());
+    Assert.assertEquals(
+        lastChunk.getChecksumData(), readChunk.getChecksumData());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a4b6b0/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
 
b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index fd10055..26a4ac1 100644
--- 
a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ 
b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -24,7 +24,9 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
+import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -84,6 +86,7 @@ public final class DistributedStorageHandler implements 
StorageHandler {
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long blockSize;
+  private final Checksum checksum;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -128,6 +131,27 @@ public final class DistributedStorageHandler implements 
StorageHandler {
         
conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
             OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
             TimeUnit.MILLISECONDS);
+
+    int configuredChecksumSize = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
+        OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT);
+    int checksumSize;
+    if(configuredChecksumSize <
+        OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
+      LOG.warn("The checksum size ({}) is not allowed to be less than the " +
+              "minimum size ({}), resetting to the minimum size.",
+          configuredChecksumSize,
+          OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
+      checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+    } else {
+      checksumSize = configuredChecksumSize;
+    }
+    String checksumTypeStr = conf.get(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType
+        .valueOf(checksumTypeStr);
+    this.checksum = new Checksum(checksumType, checksumSize);
   }
 
   @Override
@@ -426,6 +450,7 @@ public final class DistributedStorageHandler implements 
StorageHandler {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setBlockSize(blockSize)
             .setWatchTimeout(watchTimeout)
+            .setChecksum(checksum)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to