This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9644f83c HDDS-6086. Compute MD5MD5CRC file checksum using chunk 
checksums from DataNodes (#2919)
9644f83c is described below

commit 9644f83c0ab86946e4f0f30ab6dadb222f000ebc
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Jan 10 09:02:57 2022 +0800

    HDDS-6086. Compute MD5MD5CRC file checksum using chunk checksums from 
DataNodes (#2919)
---
 hadoop-ozone/client/pom.xml                        |   4 +
 .../ozone/client/BaseFileChecksumHelper.java       | 198 ++++++++++++++++++
 .../ozone/client/ReplicatedFileChecksumHelper.java | 181 ++++++++++++++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   4 +
 .../client/TestReplicatedFileChecksumHelper.java   | 230 +++++++++++++++++++++
 .../client/src/test/resources/log4j.properties     |  23 ---
 6 files changed, 617 insertions(+), 23 deletions(-)

diff --git a/hadoop-ozone/client/pom.xml b/hadoop-ozone/client/pom.xml
index f7cbd55..b5ba8e7 100644
--- a/hadoop-ozone/client/pom.xml
+++ b/hadoop-ozone/client/pom.xml
@@ -51,6 +51,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BaseFileChecksumHelper.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BaseFileChecksumHelper.java
new file mode 100644
index 0000000..1afbc8a
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BaseFileChecksumHelper.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The base class to support file checksum.
+ */
+public abstract class BaseFileChecksumHelper {
+  static final Logger LOG =
+      LoggerFactory.getLogger(BaseFileChecksumHelper.class);
+
+  private OzoneVolume volume;
+  private OzoneBucket bucket;
+  private String keyName;
+  private final long length;
+  private ClientProtocol rpcClient;
+
+  private XceiverClientFactory xceiverClientFactory;
+  private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer();
+  private FileChecksum fileChecksum;
+  private List<OmKeyLocationInfo> keyLocationInfos;
+  private long remaining = 0L;
+  private int bytesPerCRC = -1;
+  private long crcPerBlock = 0;
+
+  // initialization
+  BaseFileChecksumHelper(
+      OzoneVolume volume, OzoneBucket bucket, String keyName,
+      long length, ClientProtocol rpcClient) throws IOException {
+
+    this.volume = volume;
+    this.bucket = bucket;
+    this.keyName = keyName;
+    this.length = length;
+    this.rpcClient = rpcClient;
+    this.xceiverClientFactory =
+        ((RpcClient)rpcClient).getXceiverClientManager();
+    if (this.length > 0) {
+      fetchBlocks();
+    }
+  }
+
+  protected String getSrc() {
+    return "Volume: " + volume.getName() + " Bucket: " + bucket.getName() + " "
+        + keyName;
+  }
+
+  protected long getLength() {
+    return length;
+  }
+
+  protected ClientProtocol getRpcClient() {
+    return rpcClient;
+  }
+
+  protected XceiverClientFactory getXceiverClientFactory() {
+    return xceiverClientFactory;
+  }
+
+  protected DataOutputBuffer getBlockChecksumBuf() {
+    return blockChecksumBuf;
+  }
+
+  protected List<OmKeyLocationInfo> getKeyLocationInfoList() {
+    return keyLocationInfos;
+  }
+
+  protected long getRemaining() {
+    return remaining;
+  }
+
+  protected void setRemaining(long remaining) {
+    this.remaining = remaining;
+  }
+
+  int getBytesPerCRC() {
+    return bytesPerCRC;
+  }
+
+  protected void setBytesPerCRC(int bytesPerCRC) {
+    this.bytesPerCRC = bytesPerCRC;
+  }
+
+  /**
+   * Request the blocks created in the most recent version from Ozone Manager.
+   *
+   * @throws IOException
+   */
+  private void fetchBlocks() throws IOException {
+    OzoneManagerProtocol ozoneManagerClient =
+        getRpcClient().getOzoneManagerClient();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volume.getName())
+        .setBucketName(bucket.getName())
+        .setKeyName(keyName)
+        .setRefreshPipeline(true)
+        .setSortDatanodesInPipeline(true)
+        .setLatestVersionLocation(true)
+        .build();
+    OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
+
+    // use OmKeyArgs to call Om.lookup() and get OmKeyInfo
+    keyLocationInfos = keyInfo
+        .getLatestVersionLocations().getBlocksLatestVersionOnly();
+  }
+
+  /**
+   * Compute file checksum given the list of chunk checksums requested earlier.
+   * @throws IOException
+   */
+  public void compute() throws IOException {
+    /**
+     * request length is 0 or the file is empty, return one with the
+     * magic entry that matches the md5 of a 32 byte zero-padded byte array.
+     */
+    if (keyLocationInfos == null || keyLocationInfos.isEmpty()) {
+      // Explicitly specified here in case the default DataOutputBuffer
+      // buffer length value is changed in future.
+      final int lenOfZeroBytes = 32;
+      byte[] emptyBlockMd5 = new byte[lenOfZeroBytes];
+      MD5Hash fileMD5 = MD5Hash.digest(emptyBlockMd5);
+      fileChecksum =  new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
+    } else {
+      checksumBlocks();
+      fileChecksum = makeFinalResult();
+    }
+  }
+
+  @VisibleForTesting
+  List<OmKeyLocationInfo> getKeyLocationInfos() {
+    return keyLocationInfos;
+  }
+
+
+  /**
+   * Compute block checksums block by block and append the raw bytes of the
+   * block checksums into getBlockChecksumBuf().
+   *
+   * @throws IOException
+   */
+  protected abstract void checksumBlocks() throws IOException;
+
+  /**
+   * Make final file checksum result given the per-block or per-block-group
+   * checksums collected into getBlockChecksumBuf().
+   */
+  private FileChecksum makeFinalResult() throws IOException {
+    // TODO: support composite CRC
+    return makeMd5CrcResult();
+  }
+
+  private FileChecksum makeMd5CrcResult() {
+    // TODO: support CRC32C
+    //compute file MD5
+    final MD5Hash fileMD5 = MD5Hash.digest(getBlockChecksumBuf().getData());
+    // assume CRC32 for now
+    return new MD5MD5CRC32GzipFileChecksum(getBytesPerCRC(),
+        crcPerBlock, fileMD5);
+  }
+
+  public FileChecksum getFileChecksum() {
+    return fileChecksum;
+  }
+
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ReplicatedFileChecksumHelper.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ReplicatedFileChecksumHelper.java
new file mode 100644
index 0000000..168458a
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ReplicatedFileChecksumHelper.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The helper class to compute file checksum for replicated files.
+ */
+public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
+  private int blockIdx;
+
+  ReplicatedFileChecksumHelper(
+      OzoneVolume volume, OzoneBucket bucket, String keyName, long length,
+      RpcClient rpcClient) throws IOException {
+    super(volume, bucket, keyName, length, rpcClient);
+  }
+
+  @Override
+  protected void checksumBlocks() throws IOException {
+    long currentLength = 0;
+    for (blockIdx = 0;
+         blockIdx < getKeyLocationInfoList().size() && getRemaining() >= 0;
+         blockIdx++) {
+      OmKeyLocationInfo keyLocationInfo =
+          getKeyLocationInfoList().get(blockIdx);
+      currentLength += keyLocationInfo.getLength();
+      if (currentLength > getLength()) {
+        return;
+      }
+
+      if (!checksumBlock(keyLocationInfo)) {
+        throw new PathIOException(
+            getSrc(), "Fail to get block MD5 for " + keyLocationInfo);
+      }
+    }
+  }
+
+  /**
+   * Return true when sounds good to continue or retry, false when severe
+   * condition or totally failed.
+   */
+  private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
+      throws IOException {
+
+    long blockNumBytes = keyLocationInfo.getLength();
+
+    if (getRemaining() < blockNumBytes) {
+      blockNumBytes = getRemaining();
+    }
+    setRemaining(getRemaining() - blockNumBytes);
+    // for each block, send request
+    List<ContainerProtos.ChunkInfo> chunkInfos =
+        getChunkInfos(keyLocationInfo);
+    ContainerProtos.ChecksumData checksumData =
+        chunkInfos.get(0).getChecksumData();
+    int bytesPerChecksum = checksumData.getBytesPerChecksum();
+    setBytesPerCRC(bytesPerChecksum);
+
+    byte[] blockChecksum = getBlockChecksumFromChunkChecksums(
+        keyLocationInfo, chunkInfos);
+    String blockChecksumForDebug = populateBlockChecksumBuf(blockChecksum);
+
+    LOG.debug("got reply from pipeline {} for block {}: blockChecksum={}, " +
+            "blockChecksumType={}",
+        keyLocationInfo.getPipeline(), keyLocationInfo.getBlockID(),
+        blockChecksumForDebug, checksumData.getType());
+    return true;
+  }
+
+  // copied from BlockInputStream
+  /**
+   * Send RPC call to get the block info from the container.
+   * @return List of chunks in this block.
+   */
+  protected List<ContainerProtos.ChunkInfo> getChunkInfos(
+      OmKeyLocationInfo keyLocationInfo) throws IOException {
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    Token<OzoneBlockTokenIdentifier> token = keyLocationInfo.getToken();
+    Pipeline pipeline = keyLocationInfo.getPipeline();
+    BlockID blockID = keyLocationInfo.getBlockID();
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              ReplicationConfig
+                  .getLegacyFactor(pipeline.getReplicationConfig())))
+          .build();
+    }
+
+    boolean success = false;
+    List<ContainerProtos.ChunkInfo> chunks;
+    XceiverClientSpi xceiverClientSpi = null;
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initializing BlockInputStream for get key to access {}",
+            blockID.getContainerID());
+      }
+      xceiverClientSpi =
+          getXceiverClientFactory().acquireClientForReadData(pipeline);
+
+      ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
+          .getDatanodeBlockIDProtobuf();
+      ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
+          .getBlock(xceiverClientSpi, datanodeBlockID, token);
+
+      chunks = response.getBlockData().getChunksList();
+      success = true;
+    } finally {
+      if (!success && xceiverClientSpi != null) {
+        getXceiverClientFactory().releaseClientForReadData(
+            xceiverClientSpi, false);
+      }
+    }
+
+    return chunks;
+  }
+
+  // TODO: copy BlockChecksumHelper here
+  byte[] getBlockChecksumFromChunkChecksums(OmKeyLocationInfo keyLocationInfo,
+      List<ContainerProtos.ChunkInfo> chunkInfoList)
+      throws IOException {
+    // TODO: support composite CRC
+    final int lenOfZeroBytes = 32;
+    byte[] emptyBlockMd5 = new byte[lenOfZeroBytes];
+    MD5Hash fileMD5 = MD5Hash.digest(emptyBlockMd5);
+    return fileMD5.getDigest();
+  }
+
+  /**
+   * Parses out the raw blockChecksum bytes from {@code checksumData}
+   * according to the blockChecksumType and populates the cumulative
+   * blockChecksumBuf with it.
+   *
+   * @return a debug-string representation of the parsed checksum if
+   *     debug is enabled, otherwise null.
+   */
+  String populateBlockChecksumBuf(byte[] checksumData)
+      throws IOException {
+    String blockChecksumForDebug = null;
+    //read md5
+    final MD5Hash md5 = new MD5Hash(checksumData);
+    md5.write(getBlockChecksumBuf());
+    if (LOG.isDebugEnabled()) {
+      blockChecksumForDebug = md5.toString();
+    }
+
+    return blockChecksumForDebug;
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 9ca8693..4f2a494 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -280,6 +280,10 @@ public class RpcClient implements ClientProtocol {
         }).build();
   }
 
+  public XceiverClientFactory getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
   static boolean validateOmVersion(String expectedVersion,
                                    List<ServiceInfo> serviceInfoList) {
     if (expectedVersion == null || expectedVersion.isEmpty()) {
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestReplicatedFileChecksumHelper.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestReplicatedFileChecksumHelper.java
new file mode 100644
index 0000000..dfb2ddc
--- /dev/null
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestReplicatedFileChecksumHelper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType.CRC32;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for ReplicatedFileChecksumHelper class.
+ */
+public class TestReplicatedFileChecksumHelper {
+
+  @Test
+  public void testEmptyBlock() throws IOException {
+    // test the file checksum of a file with an empty block.
+    RpcClient rpcClient = Mockito.mock(RpcClient.class);
+
+    OzoneManagerProtocol om = Mockito.mock(OzoneManagerProtocol.class);
+    when(rpcClient.getOzoneManagerClient()).thenReturn(om);
+
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setVolumeName(null)
+        .setBucketName(null)
+        .setKeyName(null)
+        .setOmKeyLocationInfos(Collections.singletonList(
+            new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setDataSize(0)
+        .setReplicationConfig(new RatisReplicationConfig(
+            HddsProtos.ReplicationFactor.ONE))
+        .setFileEncryptionInfo(null)
+        .setAcls(null)
+        .build();
+
+    when(om.lookupKey(any())).thenReturn(omKeyInfo);
+
+    OzoneVolume volume = Mockito.mock(OzoneVolume.class);
+    when(volume.getName()).thenReturn("vol1");
+    OzoneBucket bucket = Mockito.mock(OzoneBucket.class);
+    when(bucket.getName()).thenReturn("bucket1");
+
+    ReplicatedFileChecksumHelper helper = new ReplicatedFileChecksumHelper(
+        volume, bucket, "dummy", 10, rpcClient);
+    helper.compute();
+    FileChecksum fileChecksum = helper.getFileChecksum();
+    assertTrue(fileChecksum instanceof MD5MD5CRC32GzipFileChecksum);
+    assertEquals(DataChecksum.Type.CRC32,
+        ((MD5MD5CRC32GzipFileChecksum)fileChecksum).getCrcType());
+
+    // test negative length
+    helper = new ReplicatedFileChecksumHelper(
+        volume, bucket, "dummy", -1, rpcClient);
+    helper.compute();
+    assertNull(helper.getKeyLocationInfoList());
+  }
+
+  @Test
+  public void testOneBlock() throws IOException {
+    // test the file checksum of a file with one block.
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    RpcClient rpcClient = Mockito.mock(RpcClient.class);
+
+    List<DatanodeDetails> dns = Arrays.asList(
+        DatanodeDetails.newBuilder().setUuid(UUID.randomUUID()).build());
+    Pipeline pipeline;
+    pipeline = Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setReplicationConfig(
+            new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE))
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setNodes(dns)
+        .build();
+
+    XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+      @Override
+      public XceiverClientReply sendCommandAsync(
+          ContainerProtos.ContainerCommandRequestProto request,
+          DatanodeDetails dn) {
+        return buildValidResponse();
+      }
+    };
+    XceiverClientFactory factory = Mockito.mock(XceiverClientFactory.class);
+    when(factory.acquireClientForReadData(any())).thenReturn(client);
+
+    when(rpcClient.getXceiverClientManager()).thenReturn(factory);
+
+    OzoneManagerProtocol om = Mockito.mock(OzoneManagerProtocol.class);
+    when(rpcClient.getOzoneManagerClient()).thenReturn(om);
+
+    BlockID blockID = new BlockID(1, 1);
+    OmKeyLocationInfo omKeyLocationInfo =
+        new OmKeyLocationInfo.Builder().setPipeline(pipeline)
+            .setBlockID(blockID)
+            .build();
+
+    List<OmKeyLocationInfo> omKeyLocationInfoList =
+        Arrays.asList(omKeyLocationInfo);
+
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setVolumeName(null)
+        .setBucketName(null)
+        .setKeyName(null)
+        .setOmKeyLocationInfos(Collections.singletonList(
+            new OmKeyLocationInfoGroup(0, omKeyLocationInfoList)))
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setDataSize(0)
+        .setReplicationConfig(new RatisReplicationConfig(
+            HddsProtos.ReplicationFactor.ONE))
+        .setFileEncryptionInfo(null)
+        .setAcls(null)
+        .build();
+
+    when(om.lookupKey(any())).thenReturn(omKeyInfo);
+
+    OzoneVolume volume = Mockito.mock(OzoneVolume.class);
+    when(volume.getName()).thenReturn("vol1");
+    OzoneBucket bucket = Mockito.mock(OzoneBucket.class);
+    when(bucket.getName()).thenReturn("bucket1");
+
+    ReplicatedFileChecksumHelper helper = new ReplicatedFileChecksumHelper(
+        volume, bucket, "dummy", 10, rpcClient);
+
+    helper.compute();
+    FileChecksum fileChecksum = helper.getFileChecksum();
+    assertTrue(fileChecksum instanceof MD5MD5CRC32GzipFileChecksum);
+    assertEquals(1, helper.getKeyLocationInfos().size());
+  }
+
+  private XceiverClientReply buildValidResponse() {
+    // return a GetBlockResponse message of a block and its chunk checksums.
+    ContainerProtos.DatanodeBlockID blockID =
+        ContainerProtos.DatanodeBlockID.newBuilder()
+        .setContainerID(1)
+        .setLocalID(1)
+        .setBlockCommitSequenceId(1).build();
+
+    byte[] byteArray = new byte[10];
+    ByteString byteString = ByteString.copyFrom(byteArray);
+
+    ContainerProtos.ChecksumData checksumData =
+        ContainerProtos.ChecksumData.newBuilder()
+        .setType(CRC32)
+        .setBytesPerChecksum(1024)
+        .addChecksums(byteString)
+        .build();
+
+    ContainerProtos.ChunkInfo chunkInfo =
+        ContainerProtos.ChunkInfo.newBuilder()
+        .setChunkName("dummy_chunk")
+        .setOffset(1)
+        .setLen(10)
+        .setChecksumData(checksumData)
+        .build();
+
+    ContainerProtos.BlockData blockData =
+        ContainerProtos.BlockData.newBuilder()
+            .setBlockID(blockID)
+            .addChunks(chunkInfo)
+            .build();
+    ContainerProtos.GetBlockResponseProto getBlockResponseProto
+        = ContainerProtos.GetBlockResponseProto.newBuilder()
+        .setBlockData(blockData)
+        .build();
+
+    ContainerProtos.ContainerCommandResponseProto resp =
+        ContainerProtos.ContainerCommandResponseProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.GetBlock)
+            .setResult(ContainerProtos.Result.SUCCESS)
+            .setGetBlock(getBlockResponseProto)
+            .build();
+    final CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+        replyFuture = new CompletableFuture<>();
+    replyFuture.complete(resp);
+    return new XceiverClientReply(replyFuture);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/client/src/test/resources/log4j.properties 
b/hadoop-ozone/client/src/test/resources/log4j.properties
deleted file mode 100644
index 3987866..0000000
--- a/hadoop-ozone/client/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-#   Licensed to the Apache Software Foundation (ASF) under one or more
-#   contributor license agreements.  See the NOTICE file distributed with
-#   this work for additional information regarding copyright ownership.
-#   The ASF licenses this file to You under the Apache License, Version 2.0
-#   (the "License"); you may not use this file except in compliance with
-#   the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=INFO,stdout
-log4j.threshold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} 
(%F:%M(%L)) - %m%n

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to