This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9569015 HDDS-1511. Space tracking for Open Containers in HDDS
Volumes. Contributed by Supratim Deka (#812)
9569015 is described below
commit 9569015802e695f1c242c74d5ac9df27e180374c
Author: supratimdeka <[email protected]>
AuthorDate: Wed May 15 22:56:01 2019 +0530
HDDS-1511. Space tracking for Open Containers in HDDS Volumes. Contributed
by Supratim Deka (#812)
---
.../ozone/container/common/impl/ContainerData.java | 63 ++++++++++++++++++++++
.../ozone/container/common/impl/ContainerSet.java | 2 +
.../ozone/container/common/volume/HddsVolume.java | 21 ++++++++
.../common/impl/TestContainerPersistence.java | 21 ++++++++
4 files changed, 107 insertions(+)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index d2fa2c8..ec70dbd 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -71,6 +71,8 @@ public abstract class ContainerData {
private final long maxSize;
+ private boolean committedSpace;
+
//ID of the pipeline where this container is created
private String originPipelineId;
//ID of the datanode where this container is created
@@ -184,7 +186,23 @@ public abstract class ContainerData {
* @param state
*/
public synchronized void setState(ContainerDataProto.State state) {
+ ContainerDataProto.State oldState = this.state;
this.state = state;
+
+ if ((oldState == ContainerDataProto.State.OPEN) &&
+ (state != oldState)) {
+ releaseCommitSpace();
+ }
+
+ /**
+ * commit space when container transitions (back) to Open.
+ * when? perhaps closing a container threw an exception
+ */
+ if ((state == ContainerDataProto.State.OPEN) &&
+ (state != oldState)) {
+ Preconditions.checkState(getMaxSize() > 0);
+ commitSpace();
+ }
}
/**
@@ -280,6 +298,41 @@ public abstract class ContainerData {
setState(ContainerDataProto.State.CLOSED);
}
+ private void releaseCommitSpace() {
+ long unused = getMaxSize() - getBytesUsed();
+
+ // only if container size < max size
+ if (unused > 0 && committedSpace) {
+ getVolume().incCommittedBytes(0 - unused);
+ }
+ committedSpace = false;
+ }
+
+ /**
+ * add available space in the container to the committed space in the volume.
+ * available space is the number of bytes remaining till max capacity.
+ */
+ public void commitSpace() {
+ long unused = getMaxSize() - getBytesUsed();
+ ContainerDataProto.State myState = getState();
+ HddsVolume cVol;
+
+ //we don't expect duplicate calls
+ Preconditions.checkState(!committedSpace);
+
+ // Only Open Containers have Committed Space
+ if (myState != ContainerDataProto.State.OPEN) {
+ return;
+ }
+
+ // junit tests do not always set up volume
+ cVol = getVolume();
+ if (unused > 0 && (cVol != null)) {
+ cVol.incCommittedBytes(unused);
+ committedSpace = true;
+ }
+ }
+
/**
* Get the number of bytes read from the container.
* @return the number of bytes read from the container.
@@ -321,10 +374,20 @@ public abstract class ContainerData {
/**
* Increase the number of bytes write into the container.
+ * Also decrement committed bytes against the bytes written.
* @param bytes the number of bytes write into the container.
*/
public void incrWriteBytes(long bytes) {
+ long unused = getMaxSize() - getBytesUsed();
+
this.writeBytes.addAndGet(bytes);
+
+ // only if container size < max size
+ if (committedSpace && unused > 0) {
+ //with this write, container size might breach max size
+ long decrement = Math.min(bytes, unused);
+ this.getVolume().incCommittedBytes(0 - decrement);
+ }
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 4a7a950..7dbcbef 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -68,6 +68,8 @@ public class ContainerSet {
if(containerMap.putIfAbsent(containerId, container) == null) {
LOG.debug("Container with container Id {} is added to containerMap",
containerId);
+ // wish we could have done this from ContainerData.setState
+ container.getContainerData().commitSpace();
return true;
} else {
LOG.warn("Container already exists with container Id {}", containerId);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index ab18273..3a20345 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -44,6 +44,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
/**
* HddsVolume represents volume in a datanode. {@link VolumeSet} maintains a
@@ -85,6 +86,7 @@ public class HddsVolume
private String datanodeUuid; // id of the DataNode
private long cTime; // creation time of the file system state
private int layoutVersion; // layout version of the storage data
+ private final AtomicLong committedBytes; // till Open containers become full
/**
* Run a check on the current volume to determine if it is healthy.
@@ -168,6 +170,7 @@ public class HddsVolume
.storageType(b.storageType)
.configuredCapacity(b.configuredCapacity);
this.volumeInfo = volumeBuilder.build();
+ this.committedBytes = new AtomicLong(0);
LOG.info("Creating Volume: " + this.hddsRootDir + " of storage type : "
+
b.storageType + " and capacity : " + volumeInfo.getCapacity());
@@ -181,6 +184,7 @@ public class HddsVolume
volumeInfo = null;
storageID = UUID.randomUUID().toString();
state = VolumeState.FAILED;
+ committedBytes = null;
}
}
@@ -422,6 +426,23 @@ public class HddsVolume
}
/**
+ * add "delta" bytes to committed space in the volume.
+ * @param delta bytes to add to committed space counter
+ * @return bytes of committed space
+ */
+ public long incCommittedBytes(long delta) {
+ return committedBytes.addAndGet(delta);
+ }
+
+ /**
+ * return the committed space in the volume.
+ * @return bytes of committed space
+ */
+ public long getCommittedBytes() {
+ return committedBytes.get();
+ }
+
+ /**
* Only for testing. Do not use otherwise.
*/
@VisibleForTesting
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 838dd9e..2fd169c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -162,6 +162,9 @@ public class TestContainerPersistence {
private Container addContainer(ContainerSet cSet, long cID)
throws IOException {
+ long commitBytesBefore = 0;
+ long commitBytesAfter = 0;
+ long commitIncrement = 0;
KeyValueContainerData data = new KeyValueContainerData(cID,
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
@@ -169,7 +172,15 @@ public class TestContainerPersistence {
data.addMetadata("owner)", "bilbo");
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
+ commitBytesBefore = container.getContainerData()
+ .getVolume().getCommittedBytes();
cSet.addContainer(container);
+ commitBytesAfter = container.getContainerData()
+ .getVolume().getCommittedBytes();
+ commitIncrement = commitBytesAfter - commitBytesBefore;
+ // did we commit space for the new container?
+ Assert.assertTrue(commitIncrement ==
+ ContainerTestHelper.CONTAINER_MAX_SIZE);
return container;
}
@@ -328,6 +339,9 @@ public class TestContainerPersistence {
private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
final int datalen = 1024;
+ long commitBytesBefore = 0;
+ long commitBytesAfter = 0;
+ long commitDecrement = 0;
long testContainerID = blockID.getContainerID();
Container container = containerSet.getContainer(testContainerID);
if (container == null) {
@@ -337,8 +351,15 @@ public class TestContainerPersistence {
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
+ commitBytesBefore = container.getContainerData()
+ .getVolume().getCommittedBytes();
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
getDispatcherContext());
+ commitBytesAfter = container.getContainerData()
+ .getVolume().getCommittedBytes();
+ commitDecrement = commitBytesBefore - commitBytesAfter;
+ // did we decrement commit bytes by the amount of data we wrote?
+ Assert.assertTrue(commitDecrement == info.getLen());
return info;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]