HDFS-9342. Erasure coding: client should update and commit block based on acknowledged size. Contributed by SammiChen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a9a3d219 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a9a3d219 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a9a3d219 Branch: refs/heads/HDFS-7240 Commit: a9a3d219fed2dd9d7bb84c228f6b8d97eadbe1f6 Parents: 8065129 Author: Andrew Wang <[email protected]> Authored: Sun May 7 14:45:26 2017 -0700 Committer: Andrew Wang <[email protected]> Committed: Sun May 7 14:45:26 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/DFSStripedOutputStream.java | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9a3d219/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 3dd07f7..0fdae8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -772,9 +772,37 @@ public class DFSStripedOutputStream extends DFSOutputStream { newStorageIDs[i] = ""; } } + + // should update the block group length based on the acked length + final long sentBytes = currentBlockGroup.getNumBytes(); + final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks; + Preconditions.checkState(ackedBytes <= sentBytes); + currentBlockGroup.setNumBytes(ackedBytes); + newBG.setNumBytes(ackedBytes); dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup, newBG, newNodes, newStorageIDs); currentBlockGroup = newBG; + currentBlockGroup.setNumBytes(sentBytes); + } + + /** + * Get the number of acked stripes. An acked stripe means at least data block + * number size cells of the stripe were acked. + */ + private long getNumAckedStripes() { + int minStripeNum = Integer.MAX_VALUE; + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer streamer = getStripedDataStreamer(i); + if (streamer.isHealthy()) { + int curStripeNum = 0; + if (streamer.getBlock() != null) { + curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize); + } + minStripeNum = Math.min(curStripeNum, minStripeNum); + } + } + assert minStripeNum != Integer.MAX_VALUE; + return minStripeNum; } private int stripeDataSize() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
