Author: dhruba Date: Thu Oct 11 14:27:29 2007 New Revision: 583958 URL: http://svn.apache.org/viewvc?rev=583958&view=rev Log: HADOOP-2018. The source datanode of a data transfer waits for a response from the target datanode before closing the data stream. (Hairong Kuang via dhruba)
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583958&r1=583957&r2=583958&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Oct 11 14:27:29 2007 @@ -276,6 +276,10 @@ properly i.e. the map TIP is incorrectly left marked as 'complete' and it is never rescheduled elsewhere, leading to hung reduces. (Devaraj Das via acmurthy) + + HADOOP-2018. The source datanode of a data transfer waits for + a response from the target datanode before closing the data stream. + (Hairong Kuang via dhruba) IMPROVEMENTS Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=583958&r1=583957&r2=583958&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Oct 11 14:27:29 2007 @@ -692,7 +692,48 @@ } } } - + + /* utility function for receiving a response */ + private static void receiveResponse(Socket s) throws IOException { + // check the response + DataInputStream reply = new DataInputStream(new BufferedInputStream( + s.getInputStream(), BUFFER_SIZE)); + try { + short opStatus = reply.readShort(); + if(opStatus != OP_STATUS_SUCCESS) { + throw new IOException("operation failed at "+ + s.getInetAddress()); + } + } finally { + IOUtils.closeStream(reply); + } + } + + /* utility function for sending a respose */ + private static void sendResponse(Socket s, short opStatus) throws IOException { + DataOutputStream reply = new DataOutputStream(s.getOutputStream()); + try { + reply.writeShort(opStatus); + reply.flush(); + } finally { + IOUtils.closeStream(reply); + } + } + + /* + * Informing the name node could take a long long time! Should we wait + * till namenode is informed before responding with success to the + * client? For now we don't. + */ + private void notifyNamenodeReceivedBlock(Block block) { + synchronized (receivedBlockList) { + receivedBlockList.add(block); + receivedBlockList.notifyAll(); + } + } + + + /** * Server used for receiving/sending a block of data. * This is created to listen for requests from clients or @@ -854,10 +895,11 @@ DataOutputStream mirrorOut = null; // stream to next target Socket mirrorSock = null; // socket to next target BlockReceiver blockReceiver = null; // responsible for data handling + String mirrorNode = null; // the name:port of next target try { // open a block receiver and check if the block does not exist blockReceiver = new BlockReceiver(block, in, - s.getRemoteSocketAddress().toString()); + s.getInetAddress().toString()); // // Open network conn to backup machine, if @@ -865,7 +907,6 @@ // if (targets.length > 0) { InetSocketAddress mirrorTarget = null; - String mirrorNode = null; // Connect to backup machine mirrorNode = targets[0].getName(); mirrorTarget = createSocketAddr(mirrorNode); @@ -892,40 +933,25 @@ } } - String mirrorAddr = (mirrorSock == null) ? null : - mirrorSock.getRemoteSocketAddress().toString(); + // receive the block and mirror to the next target + String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null); - /* - * Informing the name node could take a long long time! Should we wait - * till namenode is informed before responding with success to the - * client? For now we don't. - */ - synchronized (receivedBlockList) { - receivedBlockList.add(block); - receivedBlockList.notifyAll(); - } + // notify name node + notifyNamenodeReceivedBlock(block); String msg = "Received block " + block + " from " + - s.getRemoteSocketAddress(); + s.getInetAddress(); /* read response from next target in the pipeline. * ignore the response for now. Will fix it in HADOOP-1927 */ if( mirrorSock != null ) { - short result = OP_STATUS_ERROR; - DataInputStream mirrorIn = null; try { - mirrorIn = new DataInputStream( mirrorSock.getInputStream() ); - result = mirrorIn.readShort(); + receiveResponse(mirrorSock); } catch (IOException ignored) { - } finally { - IOUtils.closeStream(mirrorIn); - } - - msg += " and " + (( result != OP_STATUS_SUCCESS ) ? - "failed to mirror to " : " mirrored to ") + - mirrorAddr; + msg += " and " + ignored.getMessage(); + } } LOG.info(msg); @@ -934,17 +960,14 @@ throw ioe; } finally { // send back reply - DataOutputStream reply = new DataOutputStream(s.getOutputStream()); try { - reply.writeShort(opStatus); - reply.flush(); + sendResponse(s, opStatus); } catch (IOException ioe) { - LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress() - + "for writing block " + block ); - LOG.warn(StringUtils.stringifyException(ioe)); + LOG.warn("Error writing reply back to " + s.getInetAddress() + + " for writing block " + block +"\n" + + StringUtils.stringifyException(ioe)); } // close all opened streams - IOUtils.closeStream(reply); IOUtils.closeStream(mirrorOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); @@ -1435,6 +1458,10 @@ } // send data & checksum blockSender.sendBlock(out, null); + + // check the response + receiveResponse(sock); + LOG.info("Transmitted block " + b + " to " + curTarget); } catch (IOException ie) { LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()