[
https://issues.apache.org/jira/browse/HDFS-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14131017#comment-14131017
]
Colin Patrick McCabe commented on HDFS-6867:
--------------------------------------------
bq. I figured out that stage is set to the correct value if the recoveryFlag
flag is passed to the createBlockOutputStream. This new patch made this change.
Colin Patrick McCabe I think the basic structure in this patch reflects all we
have discussed; please take a look when you have time. Thanks!
Yeah, I think that's right. I don't think you should need to modify the
pipeline states in this patch.
I don't think {{enum BackgroundRecoveryState}} belongs in
{{HdfsConstants.java}}. This enum is really only relevant to the
{{DFSOutputStream}}-- it should be declared there (if we need it at all... see
later comments).
Instead of atomic variables and ad-hoc synchronization, how about using futures
here?
You could have something like this: when {{setupPipelineForAppendOrRecovery}}
fails to bring the pipeline up to full strength, it creates a
{{Future<DatanodeInfo[]>}}. Then, before sending each
{{o.a.h.h.DFSOutputStream.Packet}}, you check to see if the future has a new
datanode for you via {{future.get(0, TimeUnit.SECONDS)}} or similar.
A future is basically a thread that runs, produces a value, and then exits.
You create it like this:
{code}
ExecutorService executorService =
Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(...).setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
.build());
Future<DatanodeInfo[]> pipelineRecoveryFuture = executorService.submit(new
Callable<String>() {
public String call() {
while (I didn't find any nodes and shutdown = false) {
search for more datanodes
}
return datanodes (or null if shutdown)
}});
{code}
You'd probably want to throw a {{close}} function in there that sets
{{shutdown}}, but you get the idea. This is a lot simpler than trying to deal
with atomics all over, and {{pipelineRecoveryFuture}} only ever needs to be
accessed from the streamer, so it doesn't need any locking.
Keep in mind, when you have a shutdown function that sets a boolean, that
boolean needs to be volatile or atomic if another thread is reading it (that is
missing from current patch).
Config key: I like the naming of the new key. The new configuration key needs
to be explained in {{hdfs-defaults.xml}}. Especially we should document how it
interacts with the best-effort configuration. I thought about this a little
more, and I think the simplest thing to do is:
{code}
best effort = false, bg recovery = false ==> current default behavior
best effort = true, bg recovery = false ==> current best effort behavior (never
throw an exception)
best effort = false, bg recovery = true ==> When nodes are lost, we begin the
background search for new ones. When the number of remaining nodes drops too
low, we throw an exception just like today.
best effort = true, bg recovery = true ==> When nodes are lost, we begin the
background search for new ones. As long as there is at least one node, no
exception is thrown.
{code}
{code}
+ DatanodeInfo[] liveNodes =
+
dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
{code}
I don't think this is what we want. The cluster could have thousands of
datanodes-- do we really need to get information about all of them here?
Also, it's not our job (as the client) to decide what nodes the file will be
written to. The NameNode must decide that.
I think to do this right, we'll need to modify an existing NameNode RPC like
{{updatePipeline}}, or perhaps add a new RPC, that just gives us a new DataNode
suitable for our pipeline, but does *not* change the state of the block on the
NN. Basically, this means breaking updatePipeline into two separate calls...
one to find a good node (that RecoveryWorker will call), and another to
actually update the block to be in RBR (that DataStreamer will call).
Also, you'll want to pass the NameNode the {{excludedNodes}} cache, found here:
{code}
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
CacheBuilder.newBuilder()
.expireAfterWrite(
dfsClient.getConf().excludedNodesCacheExpiry,
...
{code}
just like we do with {{addBlock}}.
> For DFSOutputStream, do pipeline recovery for a single block in the background
> ------------------------------------------------------------------------------
>
> Key: HDFS-6867
> URL: https://issues.apache.org/jira/browse/HDFS-6867
> Project: Hadoop HDFS
> Issue Type: Bug
> Components: hdfs-client
> Reporter: Colin Patrick McCabe
> Assignee: Zhe Zhang
> Attachments: HDFS-6867-20140827-2.patch, HDFS-6867-20140827-3.patch,
> HDFS-6867-20140827.patch, HDFS-6867-20140828-1.patch,
> HDFS-6867-20140828-2.patch, HDFS-6867-20140910.patch,
> HDFS-6867-20140911.patch, HDFS-6867-design-20140820.pdf,
> HDFS-6867-design-20140821.pdf, HDFS-6867-design-20140822.pdf,
> HDFS-6867-design-20140827.pdf, HDFS-6867-design-20140910.pdf
>
>
> For DFSOutputStream, we should be able to do pipeline recovery in the
> background, while the user is continuing to write to the file. This is
> especially useful for long-lived clients that write to an HDFS file slowly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)