Author: omalley
Date: Fri Mar 4 03:39:21 2011
New Revision: 1077089
URL: http://svn.apache.org/viewvc?rev=1077089&view=rev
Log:
commit 8a62eb768a727018aa78330da0bca3a3e989553b
Author: Jitendra Nath Pandey <[email protected]>
Date: Tue Dec 22 18:05:31 2009 -0800
HDFS-195 from
https://issues.apache.org/jira/secure/attachment/12428788/HDFS-195-0_20.1.patch
+++ b/YAHOO-CHANGES.txt
+ HDFS-195. Need to handle access token expiration when re-establishing
the
+ pipeline for dfs write. (Jitendra Nath Pandey)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Fri Mar 4 03:39:21 2011
@@ -2687,12 +2687,11 @@ public class DFSClient implements FSCons
// If the block recovery generated a new generation stamp, use that
// from now on. Also, setup new pipeline
- //
- if (newBlock != null) {
- block = newBlock.getBlock();
- accessToken = newBlock.getAccessToken();
- nodes = newBlock.getLocations();
- }
+ // newBlock should never be null and it should contain a newly
+ // generated access token.
+ block = newBlock.getBlock();
+ accessToken = newBlock.getAccessToken();
+ nodes = newBlock.getLocations();
this.hasError = false;
lastException = null;
@@ -2787,6 +2786,7 @@ public class DFSClient implements FSCons
//
if (lastBlock != null) {
block = lastBlock.getBlock();
+ accessToken = lastBlock.getAccessToken();
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@@ -2911,6 +2911,7 @@ public class DFSClient implements FSCons
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String
client,
boolean recoveryFlag) {
+ short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
String firstBadLink = "";
if (LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
@@ -2958,9 +2959,17 @@ public class DFSClient implements FSCons
out.flush();
// receive ack for connect
+ pipelineStatus = blockReplyStream.readShort();
firstBadLink = Text.readString(blockReplyStream);
- if (firstBadLink.length() != 0) {
- throw new IOException("Bad connect ack with firstBadLink " +
firstBadLink);
+ if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (pipelineStatus ==
DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+ throw new InvalidAccessTokenException(
+ "Got access token error for connect ack with firstBadLink as "
+ + firstBadLink);
+ } else {
+ throw new IOException("Bad connect ack with firstBadLink as "
+ + firstBadLink);
+ }
}
blockStream = out;
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
Fri Mar 4 03:39:21 2011
@@ -29,17 +29,17 @@ public interface ClientDatanodeProtocol
public static final Log LOG =
LogFactory.getLog(ClientDatanodeProtocol.class);
/**
- * 3: add keepLength parameter.
+ * 4: never return null and always return a newly generated access token
*/
- public static final long versionID = 3L;
+ public static final long versionID = 4L;
/** Start generation-stamp recovery for specified block
* @param block the specified block
* @param keepLength keep the block length
* @param targets the list of possible locations of specified block
- * @return the new blockid if recovery successful and the generation stamp
- * got updated as part of the recovery, else returns null if the block id
- * not have any data and the block was deleted.
+ * @return either a new generation stamp, or the original generation stamp.
+ * Regardless of whether a new generation stamp is returned, a newly
+ * generated access token is returned as part of the return value.
* @throws IOException
*/
LocatedBlock recoverBlock(Block block, boolean keepLength,
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Fri Mar 4 03:39:21 2011
@@ -31,11 +31,12 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 15:
- * Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
- * Access token is now required on all DN operations
+ * Version 16:
+ * Datanode now needs to send back a status code together
+ * with firstBadLink during pipeline setup for dfs write
+ * (only for DFSClients, not for other datanodes).
*/
- public static final int DATA_TRANSFER_VERSION = 15;
+ public static final int DATA_TRANSFER_VERSION = 16;
// Processed at datanode stream-handler
public static final byte OP_WRITE_BLOCK = (byte) 80;
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Fri Mar 4 03:39:21 2011
@@ -1537,8 +1537,9 @@ public class DataNode extends Configured
/** Recover a block */
private LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeID[] datanodeids, boolean closeFile) throws IOException {
+ DatanodeInfo[] targets, boolean closeFile) throws IOException {
+ DatanodeID[] datanodeids = (DatanodeID[])targets;
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
// file at the same time.
@@ -1592,7 +1593,7 @@ public class DataNode extends Configured
if (!keepLength) {
block.setNumBytes(minlength);
}
- return syncBlock(block, syncList, closeFile);
+ return syncBlock(block, syncList, targets, closeFile);
} finally {
synchronized (ongoingRecovery) {
ongoingRecovery.remove(block);
@@ -1602,7 +1603,7 @@ public class DataNode extends Configured
/** Block synchronization */
private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
- boolean closeFile) throws IOException {
+ DatanodeInfo[] targets, boolean closeFile) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList + ", closeFile=" + closeFile);
@@ -1613,7 +1614,13 @@ public class DataNode extends Configured
if (syncList.isEmpty()) {
namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
DatanodeID.EMPTY_ARRAY);
- return null;
+ //always return a new access token even if everything else stays the same
+ LocatedBlock b = new LocatedBlock(block, targets);
+ if (isAccessTokenEnabled) {
+ b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+ .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
+ return b;
}
List<DatanodeID> successList = new ArrayList<DatanodeID>();
@@ -1641,7 +1648,14 @@ public class DataNode extends Configured
for (int i = 0; i < nlist.length; i++) {
info[i] = new DatanodeInfo(nlist[i]);
}
- return new LocatedBlock(newblock, info); // success
+ LocatedBlock b = new LocatedBlock(newblock, info); // success
+ // should have used client ID to generate access token, but since
+ // owner ID is not checked, we simply pass null for now.
+ if (isAccessTokenEnabled) {
+ b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+ .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
+ return b;
}
//failed
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Fri Mar 4 03:39:21 2011
@@ -268,6 +268,7 @@ class DataXceiver implements Runnable, F
.getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
try {
if (client.length() != 0) {
+
replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
Text.writeString(replyOut, datanode.dnRegistration.getName());
replyOut.flush();
}
@@ -284,6 +285,7 @@ class DataXceiver implements Runnable, F
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in
connection setup
+ short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
try {
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(block, in,
@@ -337,8 +339,9 @@ class DataXceiver implements Runnable, F
// read connect ack (only for clients, not for replication req)
if (client.length() != 0) {
+ mirrorInStatus = mirrorIn.readShort();
firstBadLink = Text.readString(mirrorIn);
- if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+ if (LOG.isDebugEnabled() || mirrorInStatus !=
DataTransferProtocol.OP_STATUS_SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
@@ -348,6 +351,7 @@ class DataXceiver implements Runnable, F
} catch (IOException e) {
if (client.length() != 0) {
+ replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
Text.writeString(replyOut, mirrorNode);
replyOut.flush();
}
@@ -370,11 +374,12 @@ class DataXceiver implements Runnable, F
// send connect ack back to source (only for clients)
if (client.length() != 0) {
- if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+ if (LOG.isDebugEnabled() || mirrorInStatus !=
DataTransferProtocol.OP_STATUS_SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
+ replyOut.writeShort(mirrorInStatus);
Text.writeString(replyOut, firstBadLink);
replyOut.flush();
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Fri Mar 4 03:39:21 2011
@@ -225,6 +225,7 @@ public class TestDataTransferProtocol ex
// bad data chunk length
sendOut.writeInt(-1-random.nextInt(oneMil));
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
Text.writeString(recvOut, ""); // first bad node
recvOut.writeLong(100); // sequencenumber
recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
@@ -254,6 +255,7 @@ public class TestDataTransferProtocol ex
sendOut.writeInt(0); // chunk length
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
Text.writeString(recvOut, ""); // first bad node
recvOut.writeLong(100); // sequencenumber
recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);