Author: dhruba
Date: Fri Apr 25 14:05:01 2008
New Revision: 651699
URL: http://svn.apache.org/viewvc?rev=651699&view=rev
Log:
HADOOP-3283. The Datanode has a RPC server. It currently supports
two RPCs: the first RPC retrives the metadata about a block and the
second RPC sets the generation stamp of an existing block.
(Tsz Wo (Nicholas), SZE via dhruba)
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
(with props)
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
(with props)
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
(with props)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr 25 14:05:01 2008
@@ -12,6 +12,11 @@
HADOOP-2865. FsShell.ls() printout format changed to print file names
in the end of the line. (Edward J. Yoon via shv)
+ HADOOP-3283. The Datanode has a RPC server. It currently supports
+ two RPCs: the first RPC retrives the metadata about a block and the
+ second RPC sets the generation stamp of an existing block.
+ (Tsz Wo (Nicholas), SZE via dhruba)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Apr 25 14:05:01 2008
@@ -262,6 +262,21 @@
</property>
<property>
+ <name>dfs.datanode.ipc.address</name>
+ <value>0.0.0.0:50020</value>
+ <description>
+ The datanode ipc server address and port.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.handler.count</name>
+ <value>3</value>
+ <description>The number of server threads for the datanode.</description>
+</property>
+
+<property>
<name>dfs.http.address</name>
<value>0.0.0.0:50070</value>
<description>
Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java?rev=651699&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
(added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java Fri
Apr 25 14:05:01 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+
+import org.apache.hadoop.io.*;
+
+/**
+ * Meta data information for a block
+ */
+class BlockMetaDataInfo extends Block {
+ static final WritableFactory FACTORY = new WritableFactory() {
+ public Writable newInstance() { return new BlockMetaDataInfo(); }
+ };
+ static { // register a ctor
+ WritableFactories.setFactory(BlockMetaDataInfo.class, FACTORY);
+ }
+
+ /** get BlockMetaDataInfo from the data set and the block scanner */
+ static BlockMetaDataInfo getBlockMetaDataInfo(Block b,
+ FSDatasetInterface dataset, DataBlockScanner blockscanner
+ ) throws IOException {
+ BlockMetaDataInfo info = new BlockMetaDataInfo();
+ info.blkid = b.getBlockId();
+ info.lastScanTime = blockscanner.getLastScanTime(b);
+ info.len = dataset.getLength(b);
+ //TODO: get generation stamp here
+ return info;
+ }
+
+ //TODO: remove generationStamp if it is defined in Block
+ private long generationStamp;
+ private long lastScanTime;
+
+ public BlockMetaDataInfo() {}
+
+ long getLastScanTime() {return lastScanTime;}
+
+ long getGenerationStamp() {return generationStamp;}
+
+ /** [EMAIL PROTECTED] */
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(generationStamp);
+ out.writeLong(lastScanTime);
+ }
+
+ /** [EMAIL PROTECTED] */
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ generationStamp = in.readLong();
+ lastScanTime = in.readLong();
+ }
+}
\ No newline at end of file
Propchange:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java Fri
Apr 25 14:05:01 2008
@@ -252,7 +252,13 @@
delBlockInfo(info);
}
}
-
+
+ /** @return the last scan time */
+ synchronized long getLastScanTime(Block block) {
+ BlockScanInfo info = blockMap.get(block);
+ return info == null? 0: info.lastScanTime;
+ }
+
/** Deletes blocks from internal structures */
void deleteBlocks(Block[] blocks) {
for ( Block b : blocks ) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Apr 25
14:05:01 2008
@@ -79,7 +79,7 @@
* information to clients or other DataNodes that might be interested.
*
**********************************************************/
-public class DataNode implements FSConstants, Runnable {
+public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
/**
@@ -131,8 +131,8 @@
private int socketWriteTimeout = 0;
private boolean transferToAllowed = true;
- private DataBlockScanner blockScanner;
- private Daemon blockScannerThread;
+ DataBlockScanner blockScanner;
+ Daemon blockScannerThread;
private static final Random R = new Random();
@@ -152,6 +152,9 @@
long balanceBandwidth;
private Throttler balancingThrottler;
+ // For InterDataNodeProtocol
+ Server ipcServer;
+
// Record all sockets opend for data transfer
Map<Socket, Socket> childSockets = Collections.synchronizedMap(
new HashMap<Socket, Socket>());
@@ -281,7 +284,7 @@
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
this.dnRegistration.setName(machineName + ":" + tmpPort);
- LOG.info("Opened server at " + tmpPort);
+ LOG.info("Opened info server at " + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiveServer");
this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
@@ -347,6 +350,16 @@
// adjust info port
this.dnRegistration.setInfoPort(this.infoServer.getPort());
myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+
+ //init ipc server
+ InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
+ conf.get("dfs.datanode.ipc.address"));
+ ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
+ conf.getInt("dfs.datanode.handler.count", 3), false, conf);
+ ipcServer.start();
+ dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
+
+ LOG.info("dnRegistration = " + dnRegistration);
}
/**
@@ -397,6 +410,17 @@
return datanodeObject;
}
+ static InterDatanodeProtocol createInterDataNodeProtocolProxy(
+ DatanodeID datanodeid, Configuration conf) throws IOException {
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
+ InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
+ }
+ return (InterDatanodeProtocol)RPC.waitForProxy(InterDatanodeProtocol.class,
+ InterDatanodeProtocol.versionID, addr, conf);
+ }
+
public InetSocketAddress getNameNodeAddr() {
return nameNodeAddr;
}
@@ -504,6 +528,9 @@
} catch (Exception e) {
}
}
+ if (ipcServer != null) {
+ ipcServer.stop();
+ }
this.shouldRun = false;
if (dataXceiveServer != null) {
((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
@@ -2924,6 +2951,35 @@
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
+ }
+ }
+
+ // InterDataNodeProtocol implementation
+ /** [EMAIL PROTECTED] */
+ public BlockMetaDataInfo getBlockMetaDataInfo(Block block
+ ) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block);
+ }
+ return BlockMetaDataInfo.getBlockMetaDataInfo(block, data, blockScanner);
+ }
+
+ /** [EMAIL PROTECTED] */
+ public boolean updateGenerationStamp(Block block, GenerationStamp
generationstamp) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+ }
+ //TODO: update generation stamp here
+ return false;
+ }
+
+ /** [EMAIL PROTECTED] */
+ public long getProtocolVersion(String protocol, long clientVersion
+ ) throws IOException {
+ if (protocol.equals(InterDatanodeProtocol.class.getName())) {
+ return InterDatanodeProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to name node: " + protocol);
}
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java Fri Apr 25
14:05:01 2008
@@ -36,13 +36,13 @@
protected String name; /// hostname:portNumber
protected String storageID; /// unique per cluster storageID
protected int infoPort; /// the port where the infoserver is running
+ protected int ipcPort; /// the port where the ipc server is running
- /**
- * DatanodeID default constructor
- */
- public DatanodeID() {
- this("", "", -1);
- }
+ /** Equivalent to DatanodeID(""). */
+ public DatanodeID() {this("");}
+
+ /** Equivalent to DatanodeID(nodeName, "", -1, -1). */
+ public DatanodeID(String nodeName) {this(nodeName, "", -1, -1);}
/**
* DatanodeID copy constructor
@@ -50,19 +50,25 @@
* @param from
*/
public DatanodeID(DatanodeID from) {
- this(from.getName(), from.getStorageID(), from.getInfoPort());
+ this(from.getName(),
+ from.getStorageID(),
+ from.getInfoPort(),
+ from.getIpcPort());
}
/**
* Create DatanodeID
- *
* @param nodeName (hostname:portNumber)
* @param storageID data storage ID
+ * @param infoPort info server port
+ * @param ipcPort ipc server port
*/
- public DatanodeID(String nodeName, String storageID, int infoPort) {
+ public DatanodeID(String nodeName, String storageID,
+ int infoPort, int ipcPort) {
this.name = nodeName;
this.storageID = storageID;
this.infoPort = infoPort;
+ this.ipcPort = ipcPort;
}
/**
@@ -87,6 +93,13 @@
}
/**
+ * @return ipcPort (the port at which the IPC server bound to)
+ */
+ public int getIpcPort() {
+ return ipcPort;
+ }
+
+ /**
* @sets data storage ID.
*/
void setStorageID(String storageID) {
@@ -154,16 +167,14 @@
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
- /**
- */
+ /** [EMAIL PROTECTED] */
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, name);
UTF8.writeString(out, storageID);
out.writeShort(infoPort);
}
- /**
- */
+ /** [EMAIL PROTECTED] */
public void readFields(DataInput in) throws IOException {
name = UTF8.readString(in);
storageID = UTF8.readString(in);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Fri Apr
25 14:05:01 2008
@@ -281,10 +281,13 @@
});
}
- /**
- */
+ /** [EMAIL PROTECTED] */
public void write(DataOutput out) throws IOException {
super.write(out);
+
+ //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+ out.writeShort(ipcPort);
+
out.writeLong(capacity);
out.writeLong(dfsUsed);
out.writeLong(remaining);
@@ -299,10 +302,13 @@
WritableUtils.writeEnum(out, getAdminState());
}
- /**
- */
+ /** [EMAIL PROTECTED] */
public void readFields(DataInput in) throws IOException {
super.readFields(in);
+
+ //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+ this.ipcPort = in.readShort() & 0x0000ffff;
+
this.capacity = in.readLong();
this.dfsUsed = in.readLong();
this.remaining = in.readLong();
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
Fri Apr 25 14:05:01 2008
@@ -47,15 +47,14 @@
* Default constructor.
*/
public DatanodeRegistration() {
- super(null, null, -1);
- this.storageInfo = new StorageInfo();
+ this("");
}
/**
* Create DatanodeRegistration
*/
public DatanodeRegistration(String nodeName) {
- super(nodeName, "", -1);
+ super(nodeName);
this.storageInfo = new StorageInfo();
}
@@ -63,6 +62,10 @@
this.infoPort = infoPort;
}
+ void setIpcPort(int ipcPort) {
+ this.ipcPort = ipcPort;
+ }
+
void setStorageInfo(DataStorage storage) {
this.storageInfo = new StorageInfo(storage);
this.storageID = storage.getStorageID();
@@ -84,22 +87,36 @@
return Storage.getRegistrationID(storageInfo);
}
+ public String toString() {
+ return getClass().getSimpleName()
+ + "(" + name
+ + ", storageID=" + storageID
+ + ", infoPort=" + infoPort
+ + ", ipcPort=" + ipcPort
+ + ")";
+ }
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
- /**
- */
+ /** [EMAIL PROTECTED] */
public void write(DataOutput out) throws IOException {
super.write(out);
+
+ //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+ out.writeShort(ipcPort);
+
out.writeInt(storageInfo.getLayoutVersion());
out.writeInt(storageInfo.getNamespaceID());
out.writeLong(storageInfo.getCTime());
}
- /**
- */
+ /** [EMAIL PROTECTED] */
public void readFields(DataInput in) throws IOException {
super.readFields(in);
+
+ //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+ this.ipcPort = in.readShort() & 0x0000ffff;
+
storageInfo.layoutVersion = in.readInt();
storageInfo.namespaceID = in.readInt();
storageInfo.cTime = in.readLong();
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Fri Apr 25
14:05:01 2008
@@ -1253,9 +1253,7 @@
* Datanode to be stored in the fsImage.
*/
public void write(DataOutput out) throws IOException {
- DatanodeID id = new DatanodeID(node.getName(), node.getStorageID(),
- node.getInfoPort());
- id.write(out);
+ new DatanodeID(node).write(out);
out.writeLong(node.getCapacity());
out.writeLong(node.getRemaining());
out.writeLong(node.getLastUpdate());
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Apr
25 14:05:01 2008
@@ -2035,7 +2035,8 @@
// update the datanode's name with ip:port
DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
nodeReg.getStorageID(),
- nodeReg.getInfoPort());
+ nodeReg.getInfoPort(),
+ nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
NameNode.stateChangeLog.info(
@@ -3252,7 +3253,7 @@
if (listDeadNodes) {
for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
DatanodeDescriptor dn =
- new DatanodeDescriptor(new DatanodeID(it.next(), "", 0));
+ new DatanodeDescriptor(new DatanodeID(it.next()));
dn.setLastUpdate(0);
nodes.add(dn);
}
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java?rev=651699&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
(added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
Fri Apr 25 14:05:01 2008
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/** An inter-datanode protocol for updating generation stamp
+ */
+interface InterDatanodeProtocol extends VersionedProtocol {
+ public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
+
+ /**
+ * 1: added getBlockMetaDataInfo and updateGenerationStamp
+ */
+ public static final long versionID = 1L;
+
+ /** @return the BlockMetaDataInfo of a block */
+ BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
+
+ /**
+ * Update the GenerationStamp of a block
+ * @return true iff update was required and done successfully
+ */
+ boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
+ ) throws IOException;
+}
\ No newline at end of file
Propchange:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Fri
Apr 25 14:05:01 2008
@@ -18,11 +18,7 @@
package org.apache.hadoop.dfs;
import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.BufferedOutputStream;
-import java.io.PipedOutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@@ -355,6 +351,7 @@
// Set up the right ports for the datanodes
conf.set("dfs.datanode.address", "127.0.0.1:0");
conf.set("dfs.datanode.http.address", "127.0.0.1:0");
+ conf.set("dfs.datanode.ipc.address", "0.0.0.0:0");
String[] args = (operation == null ||
operation == StartupOption.FORMAT ||
@@ -505,6 +502,16 @@
return list;
}
+ /** @return the datanode having the ipc server listen port */
+ DataNode getDataNode(int ipcPort) {
+ for(DataNode dn : getDataNodes()) {
+ if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) {
+ return dn;
+ }
+ }
+ return null;
+ }
+
/**
* Gets the rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used.
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java Fri
Apr 25 14:05:01 2008
@@ -23,14 +23,14 @@
public class TestHost2NodesMap extends TestCase {
static private Host2NodesMap map = new Host2NodesMap();
private final static DatanodeDescriptor dataNodes[] = new
DatanodeDescriptor[] {
- new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
- new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
- new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
- new DatanodeDescriptor(new DatanodeID("h3:5030", "0", -1), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h3:5030"), "/d1/r2"),
};
private final static DatanodeDescriptor NULL_NODE = null;
private final static DatanodeDescriptor NODE =
- new DatanodeDescriptor(new DatanodeID("h3:5040", "0", -1), "/d1/r4");
+ new DatanodeDescriptor(new DatanodeID("h3:5040"), "/d1/r4");
static {
for(DatanodeDescriptor node:dataNodes) {
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java?rev=651699&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
Fri Apr 25 14:05:01 2008
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This tests InterDataNodeProtocol for block handling.
+ */
+public class TestInterDatanodeProtocol extends junit.framework.TestCase {
+ public void testGetBlockMetaDataInfo() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ cluster.waitActive();
+
+ //create a file
+ DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem();
+ String filepath = "/foo";
+ DFSTestUtil.createFile(dfs, new Path(filepath), 1024L, (short)3, 0L);
+ assertTrue(dfs.dfs.exists(filepath));
+
+ //get block info
+ ClientProtocol namenode = dfs.dfs.namenode;
+ LocatedBlocks locations = namenode.getBlockLocations(
+ filepath, 0, Long.MAX_VALUE);
+ List<LocatedBlock> blocks = locations.getLocatedBlocks();
+ assertTrue(blocks.size() > 0);
+
+ LocatedBlock locatedblock = blocks.get(0);
+ DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+ assertTrue(datanodeinfo.length > 0);
+
+ //connect to a data node
+ InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
+ datanodeinfo[0], conf);
+ DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
+ assertTrue(datanode != null);
+
+ //stop block scanner, so we could compare lastScanTime
+ datanode.blockScannerThread.interrupt();
+
+ //verify BlockMetaDataInfo
+ Block b = locatedblock.getBlock();
+ InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
+ BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
+ assertEquals(b.getBlockId(), metainfo.getBlockId());
+ assertEquals(b.getNumBytes(), metainfo.getNumBytes());
+ assertEquals(datanode.blockScanner.getLastScanTime(b),
+ metainfo.getLastScanTime());
+
+ //TODO: verify GenerationStamp
+ InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
+ + idp.updateGenerationStamp(b, new GenerationStamp(456789L)));
+ }
+ finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
+}
\ No newline at end of file
Propchange:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
Fri Apr 25 14:05:01 2008
@@ -38,16 +38,16 @@
private static ReplicationTargetChooser replicator;
private static DatanodeDescriptor dataNodes[] =
new DatanodeDescriptor[] {
- new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
- new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
- new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
- new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
- new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d2/r3"),
- new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3")
+ new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d2/r3"),
+ new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3")
};
private final static DatanodeDescriptor NODE =
- new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r4");
+ new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r4");
static {
try {
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java
Fri Apr 25 14:05:01 2008
@@ -30,16 +30,16 @@
public class TestNetworkTopology extends TestCase {
private final static NetworkTopology cluster = new NetworkTopology();
private final static DatanodeDescriptor dataNodes[] = new
DatanodeDescriptor[] {
- new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
- new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
- new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
- new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
- new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d1/r2"),
- new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3"),
- new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r3")
+ new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3"),
+ new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r3")
};
private final static DatanodeDescriptor NODE =
- new DatanodeDescriptor(new DatanodeID("h8:5020", "0", -1), "/d2/r4");
+ new DatanodeDescriptor(new DatanodeID("h8:5020"), "/d2/r4");
static {
for(int i=0; i<dataNodes.length; i++) {