Author: ecn
Date: Thu Aug 9 20:21:32 2012
New Revision: 1371432
URL: http://svn.apache.org/viewvc?rev=1371432&view=rev
Log:
ACCUMULO-722 prototype proxy to bind a NameNode api to zookeeper/accumulo
Added:
accumulo/branches/ACCUMULO-722/distnn/README (with props)
accumulo/branches/ACCUMULO-722/distnn/pom.xml (with props)
accumulo/branches/ACCUMULO-722/distnn/src/
accumulo/branches/ACCUMULO-722/distnn/src/main/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/datanode/HealthServer.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/FakeNameNode.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwitchingNameNode.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/protocol/HealthProtocol.java
(with props)
accumulo/branches/ACCUMULO-722/distnn/src/main/resources/
accumulo/branches/ACCUMULO-722/distnn/src/test/
accumulo/branches/ACCUMULO-722/distnn/src/test/java/
accumulo/branches/ACCUMULO-722/distnn/src/test/resources/
Modified:
accumulo/branches/ACCUMULO-722/distnn/ (props changed)
Propchange: accumulo/branches/ACCUMULO-722/distnn/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Aug 9 20:21:32 2012
@@ -0,0 +1,4 @@
+.classpath
+.project
+target
+.settings
Added: accumulo/branches/ACCUMULO-722/distnn/README
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/README?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/README (added)
+++ accumulo/branches/ACCUMULO-722/distnn/README Thu Aug 9 20:21:32 2012
@@ -0,0 +1,79 @@
+This library will change hadoop 1.0.X to use the DistributedNameNode.
+
+Features
+========
+
+Create files, allocates blocks, deletes files/blocks, renames files.
+
+Non-features
+============
+Replication
+File Permissions
+Atomic deletion, rename, directory creation, file creation... really, anything.
+Balancing
+Decommissioning
+Client Locality (selecting a datanode for a replica close to the client)
+Deleting large directories may fail due to OOM
+
+Installation
+============
+
+Add this near the start of your hadoop-env.sh file:
+
+ACCUMULO=/my/accumulo/directory
+ZOOKEEPER=/my/zookeeper/directory
+export
HADOOP_CLASSPATH=/local/ecnewt2/workspace/distnn/target/classes:/local/ecnewt2/Installed/hbase/hbase-0.94.0.jar:zookeeper-3.4.3.jar
+export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$ACCUMULO/lib/*"
+export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$ACCUMULO/conf"
+export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$ZOOKEEPER/zookeeper-3.4.3.jar"
+export HADOOP_USER_CLASSPATH_FIRST=true
+
+Edit bin/start-dfs.sh to prevent the namenode and secondary namenode from
starting.
+
+Add the following to your hadoop-core.xml file:
+
+ <property>
+ <name>fs.default.name</name>
+
<value>dnn://localhost:1337/user:passwd@instance:zookeeper1,zookeeper2,zookeeper3</value>
+ </property>
+
+ <property>
+ <name>fs.dnn.impl</name>
+ <value>org.apache.hadoop.hdfs.DNNFileSystem</value>
+ </property>
+
+TODO: make this unnecessary
+Add the following to your hdfs-site.xml file:
+
+ <property>
+ <name>accumulo.zookeeper.instance</name>
+ <value>test</value>
+ </property>
+
+ <property>
+ <name>accumulo.zookeeper.keepers</name>
+ <value>rd6ul-14706v.tycho.ncsc.mil</value>
+ </property>
+
+Replace the classpath in your accumulo-site.xml:
+
+ <property>
+ <name>general.classpaths</name>
+ <value>
+ /my/local/distnn/target/classes,
+ $ACCUMULO_HOME/src/server/target/classes/,
+ $ACCUMULO_HOME/src/core/target/classes/,
+ $ACCUMULO_HOME/src/start/target/classes/,
+ $ACCUMULO_HOME/src/examples/target/classes/,
+ $ACCUMULO_HOME/lib/[^.].$ACCUMULO_VERSION.jar,
+ $ACCUMULO_HOME/lib/[^.].*.jar,
+ $ZOOKEEPER_HOME/zookeeper[^.].*.jar,
+ $HADOOP_HOME/conf,
+ $HADOOP_HOME/[^.].*.jar,
+ $HADOOP_HOME/lib/[^.].*.jar,
+ </value>
+ <description>Classpaths that accumulo checks for updates and class files.
+ When using the Security Manager, please remove the ".../target/classes/"
values.
+ </description>
+ </property>
+
Propchange: accumulo/branches/ACCUMULO-722/distnn/README
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-722/distnn/pom.xml
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/pom.xml?rev=1371432&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/pom.xml (added)
+++ accumulo/branches/ACCUMULO-722/distnn/pom.xml Thu Aug 9 20:21:32 2012
@@ -0,0 +1,34 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>distnn</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>distnn</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>1.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.0.0-alpha</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>1.4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-server</artifactId>
+ <version>1.4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.netflix.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>1.1.15</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
Propchange: accumulo/branches/ACCUMULO-722/distnn/pom.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added:
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java?rev=1371432&view=auto
==============================================================================
---
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
(added)
+++
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
Thu Aug 9 20:21:32 2012
@@ -0,0 +1,10 @@
+package org.apache.hadoop.hdfs;
+
+public class DNNConstants {
+
+ public static final String DNN = "/dnn";
+ public static final String DATANODES_PATH = "/datanodes";
+ public static final String NAMESPACE_PATH = "/namespace";
+ public static final String BLOCKS_PATH = "/blocks";
+
+}
Propchange:
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java?rev=1371432&view=auto
==============================================================================
---
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
(added)
+++
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
Thu Aug 9 20:21:32 2012
@@ -0,0 +1,609 @@
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy;
+import
org.apache.hadoop.hdfs.server.namenode.DistributedNamenodeProxy.ConnectInfo;
+import org.apache.hadoop.hdfs.server.namenode.FakeNameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.SwitchingNameNode;
+import org.apache.hadoop.hdfs.server.namenode.ZookeeperNameNode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
+import com.netflix.curator.retry.RetryUntilElapsed;
+
+// Basically a copy of DistributedFileSystem providing a different NN client
implementation
+public class DNNFileSystem extends FileSystem {
+ private static Logger log = Logger.getLogger(DNNFileSystem.class);
+
+ private Path workingDir;
+ private URI uri;
+
+ DFSClient dfs;
+ private boolean verifyChecksum = true;
+
+ static{
+ Configuration.addDefaultResource("dnn-default.xml");
+ Configuration.addDefaultResource("dnn-site.xml");
+ }
+
+ public DNNFileSystem() {
+ }
+
+ /** @deprecated */
+ public String getName() { return uri.getAuthority(); }
+
+ public URI getUri() { return uri; }
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+ setConf(conf);
+ ConnectInfo info = new ConnectInfo(uri);
+ FakeNameNode fakefakefake = null;
+ try {
+ Builder builder =
CuratorFrameworkFactory.builder().namespace(DNNConstants.DNN);
+ builder.connectString(info.zookeepers);
+ builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
+ //builder.aclProvider(aclProvider);
+ CuratorFramework client = builder.build();
+ client.start();
+ ZookeeperNameNode zoo = new ZookeeperNameNode(client);
+ fakefakefake = SwitchingNameNode.create(zoo, info);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ log.info("Creating DFSClient with fake name node " + fakefakefake);
+ this.dfs = new DFSClient(null, fakefakefake, conf, statistics);
+ this.uri = uri;
+ this.workingDir = getHomeDirectory();
+ }
+
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ public long getDefaultBlockSize() {
+ return dfs.getDefaultBlockSize();
+ }
+
+ public short getDefaultReplication() {
+ return dfs.getDefaultReplication();
+ }
+
+ private Path makeAbsolute(Path f) {
+ if (f.isAbsolute()) {
+ return f;
+ } else {
+ return new Path(workingDir, f);
+ }
+ }
+
+ public void setWorkingDirectory(Path dir) {
+ String result = makeAbsolute(dir).toUri().getPath();
+ if (!DFSUtil.isValidName(result)) {
+ throw new IllegalArgumentException("Invalid DFS directory name " +
+ result);
+ }
+ workingDir = makeAbsolute(dir);
+ }
+
+ /** {@inheritDoc} */
+ public Path getHomeDirectory() {
+ return new Path("/user/" + dfs.ugi.getShortUserName()).makeQualified(this);
+ }
+
+ private String getPathName(Path file) {
+ checkPath(file);
+ String result = makeAbsolute(file).toUri().getPath();
+ if (!DFSUtil.isValidName(result)) {
+ throw new IllegalArgumentException("Pathname " + result + " from " +
+ file+" is not a valid DFS filename.");
+ }
+ return result;
+ }
+
+
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+ long len) throws IOException {
+ if (file == null) {
+ return null;
+ }
+ statistics.incrementReadOps(1);
+ return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
+ }
+
+ public void setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ }
+
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ statistics.incrementReadOps(1);
+ return new DFSClient.DFSDataInputStream(
+ dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
+ }
+
+ /**
+ * Start the lease recovery of a file
+ *
+ * @param f a file
+ * @return true if the file is already closed
+ * @throws IOException if an error occurs
+ */
+ public boolean recoverLease(Path f) throws IOException {
+ return dfs.recoverLease(getPathName(f));
+ }
+
+ /** This optional operation is not yet supported. */
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.append(getPathName(f), bufferSize, progress, statistics);
+ }
+
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+
+ statistics.incrementWriteOps(1);
+ return new FSDataOutputStream
+ (dfs.create(getPathName(f), permission,
+ overwrite, true, replication, blockSize, progress,
bufferSize),
+ statistics);
+ }
+
+ /**
+ * Same as create(), except fails if parent directory doesn't already exist.
+ * @see #create(Path, FsPermission, boolean, int, short, long, Progressable)
+ */
+ @Override
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ boolean overwrite,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+
+ return new FSDataOutputStream
+ (dfs.create(getPathName(f), permission,
+ overwrite, false, replication, blockSize, progress,
bufferSize),
+ statistics);
+ }
+
+ public boolean setReplication(Path src,
+ short replication
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.setReplication(getPathName(src), replication);
+ }
+
+ /**
+ * Rename files/dirs
+ */
+ public boolean rename(Path src, Path dst) throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.rename(getPathName(src), getPathName(dst));
+ }
+
+ /**
+ * Get rid of Path f, whether a true file or dir.
+ */
+ @Deprecated
+ public boolean delete(Path f) throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.delete(getPathName(f));
+ }
+
+ /**
+ * requires a boolean check to delete a non
+ * empty directory recursively.
+ */
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.delete(getPathName(f), recursive);
+ }
+
+ /** {@inheritDoc} */
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ return dfs.getContentSummary(getPathName(f));
+ }
+
+ /** Set a directory's quotas
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String,
long, long)
+ */
+ public void setQuota(Path src, long namespaceQuota, long diskspaceQuota)
+ throws IOException {
+ dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
+ }
+
+ private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
+ return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
+ f.getBlockSize(), f.getModificationTime(),
+ f.getAccessTime(),
+ f.getPermission(), f.getOwner(), f.getGroup(),
+ f.getFullPath(parent).makeQualified(this)); // fully-qualify path
+ }
+
+ /**
+ * List all the entries of a directory
+ *
+ * Note that this operation is not atomic for a large directory.
+ * The entries of a directory may be fetched from NameNode multiple times.
+ * It only guarantees that each name occurs once if a directory
+ * undergoes changes between the calls.
+ */
+ @Override
+ public FileStatus[] listStatus(Path p) throws IOException {
+ String src = getPathName(p);
+
+ // fetch the first batch of entries in the directory
+ DirectoryListing thisListing = dfs.listPaths(
+ src, HdfsFileStatus.EMPTY_NAME);
+
+ if (thisListing == null) { // the directory does not exist
+ return null;
+ }
+
+ HdfsFileStatus[] partialListing = thisListing.getPartialListing();
+ if (!thisListing.hasMore()) { // got all entries of the directory
+ FileStatus[] stats = new FileStatus[partialListing.length];
+ for (int i = 0; i < partialListing.length; i++) {
+ stats[i] = makeQualified(partialListing[i], p);
+ }
+ statistics.incrementReadOps(1);
+ return stats;
+ }
+
+ // The directory size is too big that it needs to fetch more
+ // estimate the total number of entries in the directory
+ int totalNumEntries =
+ partialListing.length + thisListing.getRemainingEntries();
+ ArrayList<FileStatus> listing =
+ new ArrayList<FileStatus>(totalNumEntries);
+ // add the first batch of entries to the array list
+ for (HdfsFileStatus fileStatus : partialListing) {
+ listing.add(makeQualified(fileStatus, p));
+ }
+ statistics.incrementLargeReadOps(1);
+
+ // now fetch more entries
+ do {
+ thisListing = dfs.listPaths(src, thisListing.getLastName());
+
+ if (thisListing == null) {
+ return null; // the directory is deleted
+ }
+
+ partialListing = thisListing.getPartialListing();
+ for (HdfsFileStatus fileStatus : partialListing) {
+ listing.add(makeQualified(fileStatus, p));
+ }
+ statistics.incrementLargeReadOps(1);
+ } while (thisListing.hasMore());
+
+ return listing.toArray(new FileStatus[listing.size()]);
+ }
+
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.mkdirs(getPathName(f), permission);
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ try {
+ super.processDeleteOnExit();
+ //dfs.close();
+ } finally {
+ super.close();
+ }
+ }
+
+ public String toString() {
+ return "DFS[" + dfs + "]";
+ }
+
+ public DFSClient getClient() {
+ return dfs;
+ }
+
+ /** Return the disk usage of the filesystem, including total capacity,
+ * used space, and remaining space */
+ public DiskStatus getDiskStatus() throws IOException {
+ return dfs.getDiskStatus();
+ }
+
+ /** Return the total raw capacity of the filesystem, disregarding
+ * replication .*/
+ public long getRawCapacity() throws IOException{
+ return dfs.totalRawCapacity();
+ }
+
+ /** Return the total raw used space in the filesystem, disregarding
+ * replication .*/
+ public long getRawUsed() throws IOException{
+ return dfs.totalRawUsed();
+ }
+
+ /**
+ * Returns count of blocks with no good replicas left. Normally should be
+ * zero.
+ *
+ * @throws IOException
+ */
+ public long getMissingBlocksCount() throws IOException {
+ return dfs.getMissingBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with one of more replica missing.
+ *
+ * @throws IOException
+ */
+ public long getUnderReplicatedBlocksCount() throws IOException {
+ return dfs.getUnderReplicatedBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with at least one replica marked corrupt.
+ *
+ * @throws IOException
+ */
+ public long getCorruptBlocksCount() throws IOException {
+ return dfs.getCorruptBlocksCount();
+ }
+
+ /** Return statistics for each datanode. */
+ public DatanodeInfo[] getDataNodeStats() throws IOException {
+ return dfs.datanodeReport(DatanodeReportType.ALL);
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
+ * FSConstants.SafeModeAction)
+ */
+ public boolean setSafeMode(FSConstants.SafeModeAction action)
+ throws IOException {
+ return dfs.setSafeMode(action);
+ }
+
+ /**
+ * Save namespace image.
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#saveNamespace()
+ */
+ public void saveNamespace() throws AccessControlException, IOException {
+ dfs.saveNamespace();
+ }
+
+ /**
+ * Refreshes the list of hosts and excluded hosts from the configured
+ * files.
+ */
+ public void refreshNodes() throws IOException {
+ dfs.refreshNodes();
+ }
+
+ /**
+ * Finalize previously upgraded files system state.
+ * @throws IOException
+ */
+ public void finalizeUpgrade() throws IOException {
+ dfs.finalizeUpgrade();
+ }
+
+ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
+ ) throws IOException {
+ return dfs.distributedUpgradeProgress(action);
+ }
+
+ /*
+ * Requests the namenode to dump data strcutures into specified
+ * file.
+ */
+ public void metaSave(String pathname) throws IOException {
+ dfs.metaSave(pathname);
+ }
+
+ /**
+ * We need to find the blocks that didn't match. Likely only one
+ * is corrupt but we will report both to the namenode. In the future,
+ * we can consider figuring out exactly which block is corrupt.
+ */
+ public boolean reportChecksumFailure(Path f,
+ FSDataInputStream in, long inPos,
+ FSDataInputStream sums, long sumsPos) {
+
+ LocatedBlock lblocks[] = new LocatedBlock[2];
+
+ // Find block in data stream.
+ DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
+ Block dataBlock = dfsIn.getCurrentBlock();
+ if (dataBlock == null) {
+ LOG.error("Error: Current block in data stream is null! ");
+ return false;
+ }
+ DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
+ lblocks[0] = new LocatedBlock(dataBlock, dataNode);
+ LOG.info("Found checksum error in data stream at block="
+ + dataBlock + " on datanode="
+ + dataNode[0].getName());
+
+ // Find block in checksum stream
+ DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
+ Block sumsBlock = dfsSums.getCurrentBlock();
+ if (sumsBlock == null) {
+ LOG.error("Error: Current block in checksum stream is null! ");
+ return false;
+ }
+ DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
+ lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
+ LOG.info("Found checksum error in checksum stream at block="
+ + sumsBlock + " on datanode="
+ + sumsNode[0].getName());
+
+ // Ask client to delete blocks.
+ dfs.reportChecksumFailure(f.toString(), lblocks);
+
+ return true;
+ }
+
+ /**
+ * Returns the stat information about the file.
+ * @throws FileNotFoundException if the file does not exist.
+ */
+ public FileStatus getFileStatus(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ HdfsFileStatus fi = dfs.getFileInfo(getPathName(f));
+ if (fi != null) {
+ return makeQualified(fi, f);
+ } else {
+ throw new FileNotFoundException("File does not exist: " + f);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public MD5MD5CRC32FileChecksum getFileChecksum(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ return dfs.getFileChecksum(getPathName(f));
+ }
+
+ /** {@inheritDoc }*/
+ public void setPermission(Path p, FsPermission permission
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ dfs.setPermission(getPathName(p), permission);
+ }
+
+ /** {@inheritDoc }*/
+ public void setOwner(Path p, String username, String groupname
+ ) throws IOException {
+ if (username == null && groupname == null) {
+ throw new IOException("username == null && groupname == null");
+ }
+ statistics.incrementWriteOps(1);
+ dfs.setOwner(getPathName(p), username, groupname);
+ }
+
+ /** {@inheritDoc }*/
+ public void setTimes(Path p, long mtime, long atime
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ dfs.setTimes(getPathName(p), mtime, atime);
+ }
+
+ @Override
+ protected int getDefaultPort() {
+ return NameNode.DEFAULT_PORT;
+ }
+
+ @Override
+ public
+ Token<DelegationTokenIdentifier> getDelegationToken(String renewer
+ ) throws IOException {
+ Token<DelegationTokenIdentifier> result =
+ dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
+ return result;
+ }
+
+ /**
+ * Delegation Token Operations
+ * These are DFS only operations.
+ */
+
+ /**
+ * Get a valid Delegation Token.
+ *
+ * @param renewer Name of the designated renewer for the token
+ * @return Token<DelegationTokenIdentifier>
+ * @throws IOException
+ * @Deprecated use {@link #getDelegationToken(String)}
+ */
+ @Deprecated
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException {
+ return getDelegationToken(renewer.toString());
+ }
+
+ /**
+ * Renew an existing delegation token.
+ *
+ * @param token delegation token obtained earlier
+ * @return the new expiration time
+ * @throws IOException
+ * @deprecated Use Token.renew instead.
+ */
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ try {
+ return token.renew(getConf());
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Caught interrupted", ie);
+ }
+ }
+
+ /**
+ * Cancel an existing delegation token.
+ *
+ * @param token delegation token
+ * @throws IOException
+ * @deprecated Use Token.cancel instead.
+ */
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ try {
+ token.cancel(getConf());
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Caught interrupted", ie);
+ }
+ }
+
+ /**
+ * Requests the namenode to tell all datanodes to use a new, non-persistent
+ * bandwidth value for dfs.balance.bandwidthPerSec.
+ * The bandwidth parameter is the max number of bytes per second of network
+ * bandwidth to be used by a datanode during balancing.
+ *
+ * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ dfs.setBalancerBandwidth(bandwidth);
+ }
+
+
+}
Propchange:
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
------------------------------------------------------------------------------
svn:eol-style = native