This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 1d3423d  HDDS-5477. EC: commit key should consolidate and create one 
keyLocationInfo per blockGrp (#2648)
1d3423d is described below

commit 1d3423de766b4ced8115f83438613e8d7dcda144
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Sep 16 05:59:31 2021 -0700

    HDDS-5477. EC: commit key should consolidate and create one keyLocationInfo 
per blockGrp (#2648)
---
 .../client/io/ECBlockOutputStreamEntryPool.java    | 67 ++++++++++++++++++++++
 .../hadoop/ozone/client/MockOmTransport.java       | 21 ++++++-
 .../hadoop/ozone/client/TestOzoneECClient.java     | 23 ++++++++
 3 files changed, 109 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index 3fd6bc6..2e59650 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -31,6 +32,7 @@ import 
org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -113,6 +115,71 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
     return totalLength;
   }
 
+  @Override
+  List<OmKeyLocationInfo> getOmKeyLocationInfos(
+      List<BlockOutputStreamEntry> streams) {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    Map<BlockID, ArrayList<ECBlockOutputStreamEntry>> blkIdVsStream =
+        new HashMap<>();
+
+    for (BlockOutputStreamEntry streamEntry : streams) {
+      BlockID blkID = streamEntry.getBlockID();
+      final ArrayList<ECBlockOutputStreamEntry> stream =
+          blkIdVsStream.getOrDefault(blkID, new ArrayList<>());
+      stream.add((ECBlockOutputStreamEntry) streamEntry);
+      blkIdVsStream.put(blkID, stream);
+    }
+
+    final Iterator<Map.Entry<BlockID, ArrayList<ECBlockOutputStreamEntry>>>
+        iterator = blkIdVsStream.entrySet().iterator();
+
+    while (iterator.hasNext()) {
+      final Map.Entry<BlockID, ArrayList<ECBlockOutputStreamEntry>>
+          blkGrpIDVsStreams = iterator.next();
+      final ArrayList<ECBlockOutputStreamEntry> blkGrpStreams =
+          blkGrpIDVsStreams.getValue();
+      List<DatanodeDetails> nodeStatus = new ArrayList<>();
+      Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
+
+      // Assumption: Irrespective of failures, stream entries must have updated
+      // the lengths.
+      long blkGRpLen = 0;
+      for (ECBlockOutputStreamEntry internalBlkStream : blkGrpStreams) {
+        blkGRpLen += !(internalBlkStream).isParityStreamEntry() ?
+            internalBlkStream.getCurrentPosition() :
+            0;
+        // In EC, only one node per internal block stream.
+        final DatanodeDetails nodeDetails =
+            internalBlkStream.getPipeline().getNodeSet().iterator().next();
+        nodeStatus.add(nodeDetails);
+        nodeVsIdx.put(nodeDetails,
+            internalBlkStream.getPipeline().getReplicaIndex(nodeDetails));
+      }
+      nodeStatus.sort((o1, o2) -> nodeVsIdx.get(o1) - nodeVsIdx.get(o2));
+      final BlockOutputStreamEntry firstStreamInBlockGrp = 
blkGrpStreams.get(0);
+      Pipeline blockGrpPipeline = Pipeline.newBuilder()
+          .setId(firstStreamInBlockGrp.getPipeline().getId())
+          .setReplicationConfig(
+              firstStreamInBlockGrp.getPipeline().getReplicationConfig())
+          .setState(firstStreamInBlockGrp.getPipeline().getPipelineState())
+          .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
+      // Commit only those blocks to OzoneManager which are not empty
+      if (blkGRpLen != 0) {
+        OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
+            .setBlockID(blkGrpIDVsStreams.getKey()).setLength(blkGRpLen)
+            .setOffset(0).setToken(firstStreamInBlockGrp.getToken())
+            .setPipeline(blockGrpPipeline).build();
+        locationInfoList.add(info);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("block written " + firstStreamInBlockGrp
+            .getBlockID() + ", length " + blkGRpLen + " bcsID "
+            + firstStreamInBlockGrp.getBlockID().getBlockCommitSequenceId());
+      }
+    }
+    return locationInfoList;
+  }
+
   public void endECBlock() throws IOException {
     List<BlockOutputStreamEntry> entries = getStreamEntries();
     if (entries.size() > 0) {
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index 1da0c2d..5904d14 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -162,11 +162,28 @@ public class MockOmTransport implements OmTransport {
 
   private CommitKeyResponse commitKey(CommitKeyRequest commitKeyRequest) {
     final KeyArgs keyArgs = commitKeyRequest.getKeyArgs();
-    final KeyInfo remove =
+    final KeyInfo openKey =
         openKeys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName())
             .remove(keyArgs.getKeyName());
+    final KeyInfo.Builder committedKeyInfoWithLocations =
+        KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName())
+            .setBucketName(keyArgs.getBucketName())
+            .setKeyName(keyArgs.getKeyName())
+            .setCreationTime(openKey.getCreationTime())
+            .setModificationTime(openKey.getModificationTime())
+            .setDataSize(keyArgs.getDataSize()).setLatestVersion(0L)
+            .addKeyLocationList(KeyLocationList.newBuilder()
+                .addAllKeyLocations(keyArgs.getKeyLocationsList()));
+    // Just inherit replication config details from open Key
+    if (openKey.hasEcReplicationConfig()) {
+      committedKeyInfoWithLocations
+          .setEcReplicationConfig(openKey.getEcReplicationConfig());
+    } else if (openKey.hasFactor()) {
+      committedKeyInfoWithLocations.setFactor(openKey.getFactor());
+    }
+    committedKeyInfoWithLocations.setType(openKey.getType());
     keys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName())
-        .put(keyArgs.getKeyName(), remove);
+        .put(keyArgs.getKeyName(), committedKeyInfoWithLocations.build());
     return CommitKeyResponse.newBuilder()
         .build();
   }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 9591e27..fed29b4 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -228,6 +228,29 @@ public class TestOzoneECClient {
   }
 
   @Test
+  public void testCommitKeyInfo()
+      throws IOException {
+    final OzoneBucket bucket = writeIntoECKey(inputChunks, keyName,
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig(dataBlocks, parityBlocks)));
+
+    // create key without mentioning replication config. Since we set EC
+    // replication in bucket, key should be EC key.
+    try (OzoneOutputStream out = bucket.createKey("mykey", 2000)) {
+      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+    Assert.assertEquals(1,
+        transportStub.getKeys().get(volumeName).get(bucketName).get(keyName)
+            .getKeyLocationListCount());
+    Assert.assertEquals(inputChunks[0].length * 3,
+        transportStub.getKeys().get(volumeName).get(bucketName).get(keyName)
+            .getDataSize());
+  }
+
+  @Test
   public void testPartialStripeWithSingleChunkAndPadding() throws IOException {
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to