[
https://issues.apache.org/jira/browse/HDFS-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14529685#comment-14529685
]
Jing Zhao commented on HDFS-7678:
---------------------------------
Thanks for updating the patch, Zhe! Here are some further comments:
# The recovery + MaxPortion logic may have some issue since pread requires us
to provide precise reading range to the BlockReader. For example, for a block
with length cellSize * 8, when reading from offset cellSize * 5 till the end,
the maxportion will be 0~524288. If the 6th DN fails, the recovery code will
try to read 0~524288 from DNs with internal block size 262144. Then the
BlockReader will get exceptions from DN. I've written a unit test to generate
this kind of failure (you can add the test to TestDFSStripedInputStream):
{code}
public void test() throws IOException {
final int length = cellSize * (dataBlocks + 2);
Path testPath = new Path("/foo");
final byte[] bytes = generateBytes(length);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
// shut down the DN that holds the last internal data block
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
cellSize);
String name = (locs[0].getNames())[0];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
break;
}
}
// pread
try (FSDataInputStream fsdis = fs.open(testPath)) {
byte[] buf = new byte[length];
int readLen = fsdis.read(cellSize * 5, buf, 0, buf.length);
Assert.assertEquals("The length of file should be the same to write size",
cellSize * 3, readLen);
for (int i = cellSize * 5; i < length; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
}
}
}
{code}
Currently I'm thinking it may be easier to use cell and stripe as the basic
reading and recovery unit to avoid complexity.
# In the following code, suppose we already have one failure and one success
reading tasks before, and just get a new result. Since we still have several
pending tasks, looks like we will hit the final "else if" since the
missingBlkIndices is not empty right now? Although the later check on
{{extraBlksToFetch}} will prevent the real recovery scheduling, it will still
be better to avoid the unnecessary operations here.
{code}
if (fetchedBlkIndices.size() == dataBlkNum) {
...
} else if (missingBlkIndices.size() == parityBlkNum) {
...
} else if (!missingBlkIndices.isEmpty()) {
{code}
Also the {{extraBlksToFetch}} calculation may have some other issue but please
see whether you still want to keep this logic while fixing the above issue.
# For testing, it will be great to have HDFS-8201 and HDFS-8202 as end to end
testing. But I strongly suggest to have a separate jira to test all the
internal logic inside of DFSStripedInputStream and fix possible bugs. It will
be even better to include some tests to cover basic scenarios in this jira
(maybe just extend the above test to cover different failure point, file size,
and read offset).
Some minors:
# How about adding a {{toString}} method to {{StripedReadResult}} and simplify
the following code?
{code}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Read task from block " + r.index + " returned. "
+
"Status: " + r.state + ", start offset in block: " +
actualReadPortions[r.index].startOffsetInBlock);
}
{code}
# The following code needs reformat. It can also be moved out into a separate
function in order to decrease the length of {{processReadTaskEvents}}. Besides,
{{checkArgument}} should be {{checkState}}?
{code}
if (r.state == StripedReadResult.SUCCESSFUL) {
// Successfully fetched requested portion; it could be normal I/O
// or for recovery
Preconditions.checkArgument(r.index >= 0);
if (actualReadPortions[r.index].containsReadPortion(maxPortion)) {
fetchedBlkIndices.add(r.index);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Fully fetched block " + r.index);
}
}
} else {
// Issue new reads to recover failed tasks
Preconditions.checkArgument(r.index >= 0);
missingBlkIndices.add(r.index);
}
{code}
# We can bear {{parityBlkNum}} failures thus the "==" should be ">"? We can add
a unit test for this failure case in this jira.
{code}
} else if (missingBlkIndices.size() == parityBlkNum) {
// Too many read tasks have failed
clearFutures(futures.keySet());
throw new IOException("Too many blocks are missing: " +
missingBlkIndices);
}
{code}
# Maybe we do not need this logic in this jira.
{code}
if (threshold > 0) {
future = readService.poll(threshold, TimeUnit.MILLISECONDS);
} else {
future = readService.take();
}
{code}
> Erasure coding: DFSInputStream with decode functionality (pread)
> ----------------------------------------------------------------
>
> Key: HDFS-7678
> URL: https://issues.apache.org/jira/browse/HDFS-7678
> Project: Hadoop HDFS
> Issue Type: Sub-task
> Affects Versions: HDFS-7285
> Reporter: Li Bo
> Assignee: Zhe Zhang
> Attachments: BlockGroupReader.patch, HDFS-7678-HDFS-7285.002.patch,
> HDFS-7678-HDFS-7285.003.patch, HDFS-7678-HDFS-7285.004.patch,
> HDFS-7678-HDFS-7285.005.patch, HDFS-7678-HDFS-7285.006.patch,
> HDFS-7678-HDFS-7285.007.patch, HDFS-7678-HDFS-7285.008.patch,
> HDFS-7678-HDFS-7285.009.patch, HDFS-7678-HDFS-7285.010.patch,
> HDFS-7678.000.patch, HDFS-7678.001.patch
>
>
> A block group reader will read data from BlockGroup no matter in striping
> layout or contiguous layout. The corrupt blocks can be known before
> reading(told by namenode), or just be found during reading. The block group
> reader needs to do decoding work when some blocks are found corrupt.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)