This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9569015  HDDS-1511. Space tracking for Open Containers in HDDS 
Volumes. Contributed by Supratim Deka (#812)
9569015 is described below

commit 9569015802e695f1c242c74d5ac9df27e180374c
Author: supratimdeka <46919641+supratimd...@users.noreply.github.com>
AuthorDate: Wed May 15 22:56:01 2019 +0530

    HDDS-1511. Space tracking for Open Containers in HDDS Volumes. Contributed 
by Supratim Deka (#812)
---
 .../ozone/container/common/impl/ContainerData.java | 63 ++++++++++++++++++++++
 .../ozone/container/common/impl/ContainerSet.java  |  2 +
 .../ozone/container/common/volume/HddsVolume.java  | 21 ++++++++
 .../common/impl/TestContainerPersistence.java      | 21 ++++++++
 4 files changed, 107 insertions(+)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index d2fa2c8..ec70dbd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -71,6 +71,8 @@ public abstract class ContainerData {
 
   private final long maxSize;
 
+  private boolean committedSpace;
+
   //ID of the pipeline where this container is created
   private String originPipelineId;
   //ID of the datanode where this container is created
@@ -184,7 +186,23 @@ public abstract class ContainerData {
    * @param state
    */
   public synchronized void setState(ContainerDataProto.State state) {
+    ContainerDataProto.State oldState = this.state;
     this.state = state;
+
+    if ((oldState == ContainerDataProto.State.OPEN) &&
+        (state != oldState)) {
+      releaseCommitSpace();
+    }
+
+    /**
+     * commit space when container transitions (back) to Open.
+     * when? perhaps closing a container threw an exception
+     */
+    if ((state == ContainerDataProto.State.OPEN) &&
+        (state != oldState)) {
+      Preconditions.checkState(getMaxSize() > 0);
+      commitSpace();
+    }
   }
 
   /**
@@ -280,6 +298,41 @@ public abstract class ContainerData {
     setState(ContainerDataProto.State.CLOSED);
   }
 
+  private void releaseCommitSpace() {
+    long unused = getMaxSize() - getBytesUsed();
+
+    // only if container size < max size
+    if (unused > 0 && committedSpace) {
+      getVolume().incCommittedBytes(0 - unused);
+    }
+    committedSpace = false;
+  }
+
+  /**
+   * add available space in the container to the committed space in the volume.
+   * available space is the number of bytes remaining till max capacity.
+   */
+  public void commitSpace() {
+    long unused = getMaxSize() - getBytesUsed();
+    ContainerDataProto.State myState = getState();
+    HddsVolume cVol;
+
+    //we don't expect duplicate calls
+    Preconditions.checkState(!committedSpace);
+
+    // Only Open Containers have Committed Space
+    if (myState != ContainerDataProto.State.OPEN) {
+      return;
+    }
+
+    // junit tests do not always set up volume
+    cVol = getVolume();
+    if (unused > 0 && (cVol != null)) {
+      cVol.incCommittedBytes(unused);
+      committedSpace = true;
+    }
+  }
+
   /**
    * Get the number of bytes read from the container.
    * @return the number of bytes read from the container.
@@ -321,10 +374,20 @@ public abstract class ContainerData {
 
   /**
    * Increase the number of bytes write into the container.
+   * Also decrement committed bytes against the bytes written.
    * @param bytes the number of bytes write into the container.
    */
   public void incrWriteBytes(long bytes) {
+    long unused = getMaxSize() - getBytesUsed();
+
     this.writeBytes.addAndGet(bytes);
+
+    // only if container size < max size
+    if (committedSpace && unused > 0) {
+      //with this write, container size might breach max size
+      long decrement = Math.min(bytes, unused);
+      this.getVolume().incCommittedBytes(0 - decrement);
+    }
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 4a7a950..7dbcbef 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -68,6 +68,8 @@ public class ContainerSet {
     if(containerMap.putIfAbsent(containerId, container) == null) {
       LOG.debug("Container with container Id {} is added to containerMap",
           containerId);
+      // wish we could have done this from ContainerData.setState
+      container.getContainerData().commitSpace();
       return true;
     } else {
       LOG.warn("Container already exists with container Id {}", containerId);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index ab18273..3a20345 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -44,6 +44,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * HddsVolume represents volume in a datanode. {@link VolumeSet} maintains a
@@ -85,6 +86,7 @@ public class HddsVolume
   private String datanodeUuid;    // id of the DataNode
   private long cTime;             // creation time of the file system state
   private int layoutVersion;      // layout version of the storage data
+  private final AtomicLong committedBytes; // till Open containers become full
 
   /**
    * Run a check on the current volume to determine if it is healthy.
@@ -168,6 +170,7 @@ public class HddsVolume
               .storageType(b.storageType)
               .configuredCapacity(b.configuredCapacity);
       this.volumeInfo = volumeBuilder.build();
+      this.committedBytes = new AtomicLong(0);
 
       LOG.info("Creating Volume: " + this.hddsRootDir + " of  storage type : " 
+
           b.storageType + " and capacity : " + volumeInfo.getCapacity());
@@ -181,6 +184,7 @@ public class HddsVolume
       volumeInfo = null;
       storageID = UUID.randomUUID().toString();
       state = VolumeState.FAILED;
+      committedBytes = null;
     }
   }
 
@@ -422,6 +426,23 @@ public class HddsVolume
   }
 
   /**
+   * add "delta" bytes to committed space in the volume.
+   * @param delta bytes to add to committed space counter
+   * @return bytes of committed space
+   */
+  public long incCommittedBytes(long delta) {
+    return committedBytes.addAndGet(delta);
+  }
+
+  /**
+   * return the committed space in the volume.
+   * @return bytes of committed space
+   */
+  public long getCommittedBytes() {
+    return committedBytes.get();
+  }
+
+  /**
    * Only for testing. Do not use otherwise.
    */
   @VisibleForTesting
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 838dd9e..2fd169c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -162,6 +162,9 @@ public class TestContainerPersistence {
 
   private Container addContainer(ContainerSet cSet, long cID)
       throws IOException {
+    long commitBytesBefore = 0;
+    long commitBytesAfter = 0;
+    long commitIncrement = 0;
     KeyValueContainerData data = new KeyValueContainerData(cID,
         ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
         UUID.randomUUID().toString());
@@ -169,7 +172,15 @@ public class TestContainerPersistence {
     data.addMetadata("owner)", "bilbo");
     KeyValueContainer container = new KeyValueContainer(data, conf);
     container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
+    commitBytesBefore = container.getContainerData()
+        .getVolume().getCommittedBytes();
     cSet.addContainer(container);
+    commitBytesAfter = container.getContainerData()
+        .getVolume().getCommittedBytes();
+    commitIncrement = commitBytesAfter - commitBytesBefore;
+    // did we commit space for the new container?
+    Assert.assertTrue(commitIncrement ==
+        ContainerTestHelper.CONTAINER_MAX_SIZE);
     return container;
   }
 
@@ -328,6 +339,9 @@ public class TestContainerPersistence {
 
   private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
     final int datalen = 1024;
+    long commitBytesBefore = 0;
+    long commitBytesAfter = 0;
+    long commitDecrement = 0;
     long testContainerID = blockID.getContainerID();
     Container container = containerSet.getContainer(testContainerID);
     if (container == null) {
@@ -337,8 +351,15 @@ public class TestContainerPersistence {
         blockID.getLocalID(), 0, 0, datalen);
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
+    commitBytesBefore = container.getContainerData()
+        .getVolume().getCommittedBytes();
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
         getDispatcherContext());
+    commitBytesAfter = container.getContainerData()
+        .getVolume().getCommittedBytes();
+    commitDecrement = commitBytesBefore - commitBytesAfter;
+    // did we decrement commit bytes by the amount of data we wrote?
+    Assert.assertTrue(commitDecrement == info.getLen());
     return info;
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to