[ 
https://issues.apache.org/jira/browse/HDFS-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14131017#comment-14131017
 ] 

Colin Patrick McCabe commented on HDFS-6867:
--------------------------------------------

bq. I figured out that stage is set to the correct value if the recoveryFlag 
flag is passed to the createBlockOutputStream. This new patch made this change. 
Colin Patrick McCabe I think the basic structure in this patch reflects all we 
have discussed; please take a look when you have time. Thanks!

Yeah, I think that's right. I don't think you should need to modify the 
pipeline states in this patch.

I don't think {{enum BackgroundRecoveryState}} belongs in 
{{HdfsConstants.java}}.  This enum is really only relevant to the 
{{DFSOutputStream}}-- it should be declared there (if we need it at all... see 
later comments).

Instead of atomic variables and ad-hoc synchronization, how about using futures 
here?

You could have something like this: when {{setupPipelineForAppendOrRecovery}} 
fails to bring the pipeline up to full strength, it creates a 
{{Future<DatanodeInfo[]>}}.  Then, before sending each 
{{o.a.h.h.DFSOutputStream.Packet}}, you check to see if the future has a new 
datanode for you via {{future.get(0, TimeUnit.SECONDS)}} or similar.

A future is basically a thread that runs, produces a value, and then exits.  
You create it like this:
{code}
   ExecutorService executorService = 
      Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setDaemon(true)
          
.setNameFormat(...).setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
          .build());
     Future<DatanodeInfo[]> pipelineRecoveryFuture = executorService.submit(new 
Callable<String>() {
         public String call() {
             while (I didn't find any nodes and shutdown = false) {
                search for more datanodes
             }
             return datanodes (or null if shutdown)
         }});
{code}

You'd probably want to throw a {{close}} function in there that sets 
{{shutdown}}, but you get the idea.  This is a lot simpler than trying to deal 
with atomics all over, and {{pipelineRecoveryFuture}} only ever needs to be 
accessed from the streamer, so it doesn't need any locking.

Keep in mind, when you have a shutdown function that sets a boolean, that 
boolean needs to be volatile or atomic if another thread is reading it (that is 
missing from current patch).

Config key: I like the naming of the new key.  The new configuration key needs 
to be explained in {{hdfs-defaults.xml}}.  Especially we should document how it 
interacts with the best-effort configuration.  I thought about this a little 
more, and I think the simplest thing to do is:

{code}
best effort = false, bg recovery = false ==> current default behavior
best effort = true, bg recovery = false ==> current best effort behavior (never 
throw an exception)
best effort = false, bg recovery = true ==> When nodes are lost, we begin the 
background search for new ones.  When the number of remaining nodes drops too 
low, we throw an exception just like today.
best effort = true, bg recovery = true ==> When nodes are lost, we begin the 
background search for new ones.  As long as there is at least one node, no 
exception is thrown.
{code}

{code}
+            DatanodeInfo[] liveNodes =
+                
dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
{code}

I don't think this is what we want.  The cluster could have thousands of 
datanodes-- do we really need to get information about all of them here?   
Also, it's not our job (as the client) to decide what nodes the file will be 
written to.  The NameNode must decide that.

I think to do this right, we'll need to modify an existing NameNode RPC like 
{{updatePipeline}}, or perhaps add a new RPC, that just gives us a new DataNode 
suitable for our pipeline, but does *not* change the state of the block on the 
NN.  Basically, this means breaking updatePipeline into two separate calls... 
one to find a good node (that RecoveryWorker will call), and another to 
actually update the block to be in RBR (that DataStreamer will call).

Also, you'll want to pass the NameNode the {{excludedNodes}} cache, found here:
{code}
    private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
        CacheBuilder.newBuilder()
        .expireAfterWrite(
            dfsClient.getConf().excludedNodesCacheExpiry,
...
{code}

just like we do with {{addBlock}}.

> For DFSOutputStream, do pipeline recovery for a single block in the background
> ------------------------------------------------------------------------------
>
>                 Key: HDFS-6867
>                 URL: https://issues.apache.org/jira/browse/HDFS-6867
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: hdfs-client
>            Reporter: Colin Patrick McCabe
>            Assignee: Zhe Zhang
>         Attachments: HDFS-6867-20140827-2.patch, HDFS-6867-20140827-3.patch, 
> HDFS-6867-20140827.patch, HDFS-6867-20140828-1.patch, 
> HDFS-6867-20140828-2.patch, HDFS-6867-20140910.patch, 
> HDFS-6867-20140911.patch, HDFS-6867-design-20140820.pdf, 
> HDFS-6867-design-20140821.pdf, HDFS-6867-design-20140822.pdf, 
> HDFS-6867-design-20140827.pdf, HDFS-6867-design-20140910.pdf
>
>
> For DFSOutputStream, we should be able to do pipeline recovery in the 
> background, while the user is continuing to write to the file.  This is 
> especially useful for long-lived clients that write to an HDFS file slowly. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to