Author: ecn
Date: Wed Jun 5 17:19:25 2013
New Revision: 1489969
URL: http://svn.apache.org/r1489969
Log:
ACCUMULO-118 support multiple namespaces for tables
Added:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
(with props)
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java
(with props)
Removed:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
Modified:
accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java
accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java
Modified:
accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
(original)
+++
accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
Wed Jun 5 17:19:25 2013
@@ -81,7 +81,7 @@ public class TableDiskUsage {
}
Map<List<String>,Long> calculateUsage() {
-
+
Map<List<Integer>,Long> usage = new HashMap<List<Integer>,Long>();
for (Entry<String,Integer[]> entry : tableFiles.entrySet()) {
@@ -138,6 +138,8 @@ public class TableDiskUsage {
HashSet<String> tablesReferenced = new HashSet<String>(tableIds);
HashSet<String> emptyTableIds = new HashSet<String>();
+ final String TABLES = Constants.getTablesDir(acuConf);
+
for (String tableId : tableIds) {
Scanner mdScanner = null;
try {
@@ -152,20 +154,24 @@ public class TableDiskUsage {
emptyTableIds.add(tableId);
}
+ // TODO ACCUMULO-118 there are multiple table locations/filesystems
for (Entry<Key,Value> entry : mdScanner) {
String file = entry.getKey().getColumnQualifier().toString();
- if (file.startsWith("../")) {
+ if (file.contains(":")) {
+ file = file.substring(file.indexOf(TABLES) + TABLES.length());
+ } else if (file.startsWith("../")) {
file = file.substring(2);
tablesReferenced.add(file.split("\\/")[1]);
- } else
+ } else {
file = "/" + tableId + file;
+ }
tdu.linkFileAndTable(tableId, file);
}
}
for (String tableId : tablesReferenced) {
- FileStatus[] files = fs.globStatus(new
Path(Constants.getTablesDir(acuConf) + "/" + tableId + "/*/*"));
+ FileStatus[] files = fs.globStatus(new Path(TABLES + "/" + tableId +
"/*/*"));
for (FileStatus fileStatus : files) {
String dir = fileStatus.getPath().getParent().getName();
Modified:
accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
(original)
+++
accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
Wed Jun 5 17:19:25 2013
@@ -982,7 +982,7 @@ public class SimpleTest {
assertEquals("10", new String(more.getResults().get(0).getValue()));
try {
client.checkIteratorConflicts(creds, TABLE_TEST, setting,
EnumSet.allOf(IteratorScope.class));
- fail("checkIteratorConflicts did not throw and exception");
+ fail("checkIteratorConflicts did not throw an exception");
} catch (Exception ex) {}
client.deleteRows(creds, TABLE_TEST, null, null);
client.removeIterator(creds, TABLE_TEST, "test",
EnumSet.allOf(IteratorScope.class));
@@ -1141,6 +1141,6 @@ public class SimpleTest {
@AfterClass
public static void tearDownMiniCluster() throws Exception {
accumulo.stop();
- folder.delete();
+ //folder.delete();
}
}
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
Wed Jun 5 17:19:25 2013
@@ -24,32 +24,49 @@ import static org.apache.accumulo.core.C
public class ServerConstants {
// these are functions to delay loading the Accumulo configuration unless we
must
- public static String getBaseDir() {
- return
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+ public static String[] getBaseDirs() {
+ String singleNamespace =
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+ String ns =
ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_NAMESPACES);
+ if (ns == null) {
+ return new String[] { singleNamespace };
+ }
+ String namespaces[] = ns.split(",");
+ if (namespaces.length < 2) {
+ return new String[] { singleNamespace };
+ }
+ return prefix(namespaces, singleNamespace);
+ }
+
+ public static String[] prefix(String bases[], String suffix) {
+ String result[] = new String[bases.length];
+ for (int i = 0; i < bases.length; i++) {
+ result[i] = bases[i] + "/" + suffix;
+ }
+ return result;
}
- public static String getTablesDir() {
- return getBaseDir() + "/tables";
+ public static String[] getTablesDirs() {
+ return prefix(getBaseDirs(), "tables");
}
- public static String getRecoveryDir() {
- return getBaseDir() + "/recovery";
+ public static String[] getRecoveryDirs() {
+ return prefix(getBaseDirs(), "recovery");
}
public static Path getInstanceIdLocation() {
- return new Path(getBaseDir() + "/instance_id");
+ return new
Path(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR)
+ "/instance_id");
}
public static Path getDataVersionLocation() {
- return new Path(getBaseDir() + "/version");
+ return new
Path(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR)
+ "/version");
}
- public static String getMetadataTableDir() {
- return getTablesDir() + "/" + METADATA_TABLE_ID;
+ public static String[] getMetadataTableDirs() {
+ return prefix(getTablesDirs(), METADATA_TABLE_ID);
}
public static String getRootTabletDir() {
- return getMetadataTableDir() + ZROOT_TABLET;
+ return prefix(getMetadataTableDirs(), ZROOT_TABLET)[0];
}
}
Added:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java?rev=1489969&view=auto
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
(added)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
Wed Jun 5 17:19:25 2013
@@ -0,0 +1,63 @@
+package org.apache.accumulo.server.fs;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * This is a glue object, to convert short file references to long references.
+ * The !METADATA table may contain old relative file references. This class
keeps
+ * track of the short file reference, so it can be removed properly from the
!METADATA table.
+ */
+public class FileRef implements Comparable<FileRef> {
+ String metaReference; // something like ../2/d-00000/A00001.rf
+ Path fullReference; // something like
hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+
+ public FileRef(FileSystem fs, Key key) {
+ metaReference = key.getColumnQualifier().toString();
+ fullReference = new Path(fs.getFullPath(key));
+ }
+
+ public FileRef(String metaReference, Path fullReference) {
+ this.metaReference = metaReference;
+ this.fullReference = fullReference;
+ }
+
+ public FileRef(String path) {
+ this.metaReference = path;
+ this.fullReference = new Path(path);
+ }
+
+ public String toString() {
+ return fullReference.toString();
+ }
+
+ public Path path() {
+ return fullReference;
+ }
+
+ public Text meta() {
+ return new Text(metaReference);
+ }
+
+ @Override
+ public int compareTo(FileRef o) {
+ return path().compareTo(o.path());
+ }
+
+ @Override
+ public int hashCode() {
+ return path().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof FileRef) {
+ return compareTo((FileRef)obj) == 0;
+ }
+ return false;
+ }
+
+
+}
Propchange:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
Wed Jun 5 17:19:25 2013
@@ -1,64 +1,103 @@
package org.apache.accumulo.server.fs;
import java.io.IOException;
-import java.util.Collection;
+import java.util.Map;
import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+/**
+ * A wrapper around multiple hadoop FileSystem objects, which are assumed to
be different namespaces.
+ */
public interface FileSystem {
+ // close the underlying FileSystems
void close() throws IOException;
+ // the mechanism by which the master ensures that tablet servers can no
longer write to a WAL
boolean closePossiblyOpenFile(Path path) throws IOException;
+ // forward to the appropriate FileSystem object
FSDataOutputStream create(Path dest) throws IOException;
+ // forward to the appropriate FileSystem object
FSDataOutputStream create(Path path, boolean b) throws IOException;
+ // forward to the appropriate FileSystem object
FSDataOutputStream create(Path path, boolean b, int int1, short int2, long
long1) throws IOException;
+ // create a file, but only if it doesn't exist
boolean createNewFile(Path writable) throws IOException;
+ // create a file which can be sync'd to disk
FSDataOutputStream createSyncable(Path logPath, int buffersize, short
replication, long blockSize) throws IOException;
+ // delete a file
boolean delete(Path path) throws IOException;
+ // delete a directory and anything under it
boolean deleteRecursively(Path path) throws IOException;
- boolean exists(Path newBulkDir) throws IOException;
+ // forward to the appropriate FileSystem object
+ boolean exists(Path path) throws IOException;
- FileStatus getFileStatus(Path errorPath) throws IOException;
+ // forward to the appropriate FileSystem object
+ FileStatus getFileStatus(Path path) throws IOException;
+ // find the appropriate FileSystem object given a path
org.apache.hadoop.fs.FileSystem getFileSystemByPath(Path path);
org.apache.hadoop.fs.FileSystem getFileSystemByPath(String path);
- Collection<org.apache.hadoop.fs.FileSystem> getFileSystems();
+ // get a mapping of namespace to FileSystem
+ Map<String, ? extends org.apache.hadoop.fs.FileSystem> getFileSystems();
+ // return the item in options that is in the same namespace as source
+ Path matchingFileSystem(Path source, String[] options);
+
+ // create a new path in the same namespace as the sourceDir
+ String newPathOnSameNamespace(String sourceDir, String suffix);
+
+ // forward to the appropriate FileSystem object
FileStatus[] listStatus(Path path) throws IOException;
+ // forward to the appropriate FileSystem object
boolean mkdirs(Path directory) throws IOException;
+ // forward to the appropriate FileSystem object
FSDataInputStream open(Path path) throws IOException;
+ // forward to the appropriate FileSystem object, throws an exception if the
paths are in different namespaces
boolean rename(Path path, Path newPath) throws IOException;
+ // forward to the appropriate FileSystem object
boolean moveToTrash(Path sourcePath) throws IOException;
+ // forward to the appropriate FileSystem object
short getDefaultReplication(Path logPath);
+ // forward to the appropriate FileSystem object
boolean isFile(Path path) throws IOException;
+ // all namespaces are ready to provide service (not in SafeMode, for example)
boolean isReady() throws IOException;
+ // ambiguous references to files go here
org.apache.hadoop.fs.FileSystem getDefaultNamespace();
+ // forward to the appropriate FileSystem object
FileStatus[] globStatus(Path path) throws IOException;
+ // Convert a file or directory !METADATA reference into a path
String getFullPath(Key key);
+ // Given a filename, figure out the qualified path given multiple namespaces
+ String getFullPath(String paths[], String fileName) throws IOException;
+
+ // forward to the appropriate FileSystem object
+ ContentSummary getContentSummary(String dir);
}
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
Wed Jun 5 17:19:25 2013
@@ -4,7 +4,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -20,6 +20,7 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -186,7 +187,7 @@ public class FileSystemImpl implements o
}
private void ensureSyncIsEnabled() {
- for (FileSystem fs : getFileSystems()) {
+ for (FileSystem fs : getFileSystems().values()) {
if (fs instanceof DistributedFileSystem) {
if (!fs.getConf().getBoolean("dfs.durable.sync", false) &&
!fs.getConf().getBoolean("dfs.support.append", false)) {
String msg = "Must set dfs.durable.sync OR dfs.support.append to
true. Which one needs to be set depends on your version of HDFS. See
ACCUMULO-623. \n"
@@ -230,7 +231,15 @@ public class FileSystemImpl implements o
@Override
public FileSystem getFileSystemByPath(Path path) {
- log.info("Looking up namespace on " + path);
+ if (path.isAbsolute())
+ {
+ try {
+ return path.getFileSystem(CachedConfiguration.getInstance());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
return namespaces.get(defaultNamespace);
}
@@ -240,8 +249,8 @@ public class FileSystemImpl implements o
}
@Override
- public Collection<FileSystem> getFileSystems() {
- return new ArrayList<FileSystem>(namespaces.values());
+ public Map<String, ? extends org.apache.hadoop.fs.FileSystem>
getFileSystems() {
+ return namespaces;
}
@Override
@@ -293,7 +302,7 @@ public class FileSystemImpl implements o
@Override
public boolean isReady() throws IOException {
- for (FileSystem fs : getFileSystems()) {
+ for (FileSystem fs : getFileSystems().values()) {
if (!(fs instanceof DistributedFileSystem))
continue;
DistributedFileSystem dfs = (DistributedFileSystem)fs;
@@ -341,11 +350,14 @@ public class FileSystemImpl implements o
public FileStatus[] globStatus(Path pathPattern) throws IOException {
return getFileSystemByPath(pathPattern).globStatus(pathPattern);
}
-
+
@Override
public String getFullPath(Key key) {
String relPath = key.getColumnQualifierData().toString();
+ if (relPath.contains(":"))
+ return relPath;
+
byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
if (relPath.startsWith("../"))
@@ -354,7 +366,52 @@ public class FileSystemImpl implements o
relPath = "/" + new String(tableId) + relPath;
String fullPath = Constants.getTablesDir(conf) + relPath;
FileSystem ns = getFileSystemByPath(fullPath);
- return ns.makeQualified(new Path(fullPath)).toString();
+ String result = ns.makeQualified(new Path(fullPath)).toString();
+ return result;
+ }
+
+ @Override
+ public Path matchingFileSystem(Path source, String[] options) {
+ for (String fs : getFileSystems().keySet()) {
+ for (String option : options) {
+ if (option.startsWith(fs))
+ return new Path(option);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String newPathOnSameNamespace(String sourceDir, String suffix) {
+ for (String fs : getFileSystems().keySet()) {
+ if (sourceDir.startsWith(fs)) {
+ return fs + "/" + suffix;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String getFullPath(String[] paths, String fileName) throws
IOException {
+ if (fileName.contains(":"))
+ return fileName;
+ // old-style name, on one of many possible "root" paths:
+ if (fileName.startsWith("../"))
+ fileName = fileName.substring(2);
+ for (String path : paths) {
+ String fullPath = path + fileName;
+ FileSystem ns = getFileSystemByPath(fullPath);
+ Path exists = new Path(fullPath);
+ if (ns.exists(exists))
+ return ns.makeQualified(exists).toString();
+ }
+ throw new RuntimeException("Could not find file " + fileName + " in " +
Arrays.asList(paths));
+ }
+
+ @Override
+ public ContentSummary getContentSummary(String dir) {
+ // TODO Auto-generated method stub
+ return null;
}
}
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
Wed Jun 5 17:19:25 2013
@@ -358,21 +358,22 @@ public class SimpleGarbageCollector impl
// if dir exist and is empty, then empty list is returned...
// hadoop 1.0 will return null if the file doesn't exist
// hadoop 2.0 will throw an exception if the file does not exist
- FileStatus[] tabletDirs = null;
- try {
- tabletDirs = fs.listStatus(new Path(ServerConstants.getTablesDir() +
"/" + delTableId));
- } catch (FileNotFoundException ex) {
- // ignored
- }
-
- if (tabletDirs == null)
- continue;
-
- if (tabletDirs.length == 0) {
- Path p = new Path(ServerConstants.getTablesDir() + "/" + delTableId);
- if (!moveToTrash(p))
- fs.delete(p);
- }
+ for (String dir : ServerConstants.getTablesDirs()) {
+ FileStatus[] tabletDirs = null;
+ try {
+ tabletDirs = fs.listStatus(new Path(dir + "/" + delTableId));
+ } catch (FileNotFoundException ex) {
+ // ignored
+ }
+ if (tabletDirs == null)
+ continue;
+
+ if (tabletDirs.length == 0) {
+ Path p = new Path(dir + "/" + delTableId);
+ if (!moveToTrash(p))
+ fs.delete(p);
+ }
+ }
}
}
@@ -430,10 +431,12 @@ public class SimpleGarbageCollector impl
checkForBulkProcessingFiles = true;
try {
for (String validExtension : FileOperations.getValidExtensions()) {
- for (FileStatus stat : fs.globStatus(new
Path(ServerConstants.getTablesDir() + "/*/*/*." + validExtension))) {
- String cand = stat.getPath().toUri().getPath();
- if (!cand.contains(ServerConstants.getRootTabletDir())) {
-
candidates.add(cand.substring(ServerConstants.getTablesDir().length()));
+ for (String dir : ServerConstants.getTablesDirs()) {
+ for (FileStatus stat : fs.globStatus(new Path(dir + "/*/*/*." +
validExtension))) {
+ String cand = stat.getPath().toUri().getPath();
+ if (cand.contains(ServerConstants.getRootTabletDir()))
+ continue;
+ candidates.add(cand.substring(dir.length()));
log.debug("Offline candidate: " + cand);
}
}
@@ -658,9 +661,9 @@ public class SimpleGarbageCollector impl
public void run() {
boolean removeFlag;
- String fullPath = ServerConstants.getTablesDir() + delete;
- log.debug("Deleting " + fullPath);
try {
+ String fullPath = fs.getFullPath(ServerConstants.getTablesDirs(),
delete);
+ log.debug("Deleting " + fullPath);
Path p = new Path(fullPath);
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
Wed Jun 5 17:19:25 2013
@@ -96,6 +96,7 @@ import org.apache.accumulo.fate.zookeepe
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
@@ -1571,11 +1572,11 @@ public class Master implements LiveTServ
Constants.METADATA_TIME_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
- Set<String> datafiles = new TreeSet<String>();
+ Set<FileRef> datafiles = new TreeSet<FileRef>();
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
if
(key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
- datafiles.add(key.getColumnQualifier().toString());
+ datafiles.add(new FileRef(fs, key));
if (datafiles.size() > 1000) {
MetadataTable.addDeleteEntries(range, datafiles,
SecurityConstants.getSystemCredentials());
datafiles.clear();
@@ -1585,7 +1586,7 @@ public class Master implements LiveTServ
} else if
(key.compareColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) ==
0) {
throw new IllegalStateException("Tablet " + key.getRow() + " is
assigned during a merge!");
} else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
- datafiles.add(entry.getValue().toString());
+ datafiles.add(new FileRef(fs, key));
if (datafiles.size() > 1000) {
MetadataTable.addDeleteEntries(range, datafiles,
SecurityConstants.getSystemCredentials());
datafiles.clear();
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Wed Jun 5 17:19:25 2013
@@ -35,8 +35,6 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
@@ -63,6 +61,7 @@ import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -71,10 +70,11 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
@@ -180,7 +180,22 @@ public class BulkImport extends MasterRe
}
private Path createNewBulkDir(FileSystem fs, String tableId) throws
IOException {
- Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableId);
+ String tableDir = null;
+ loop:
+ for (String dir : fs.getFileSystems().keySet()) {
+ if (this.sourceDir.startsWith(dir)) {
+ for (String path : ServerConstants.getTablesDirs()) {
+ if (path.startsWith(dir)) {
+ tableDir = path;
+ break loop;
+ }
+ }
+ break;
+ }
+ }
+ if (tableDir == null)
+ throw new IllegalStateException(sourceDir + " is not in a known
namespace");
+ Path directory = new Path(tableDir + "/" + tableId);
fs.mkdirs(directory);
// only one should be able to create the lock file
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
Wed Jun 5 17:19:25 2013
@@ -27,25 +27,21 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -148,18 +144,19 @@ class CreateDir extends MasterRepo {
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- FileSystem fs =
TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration()));
- String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
- TabletOperations.createTabletDirectory(fs, dir, null);
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ FileSystem fs = master.getFileSystem();
+ TabletOperations.createTabletDirectory(fs, tableInfo.tableId, null);
return new PopulateMetadata(tableInfo);
}
@Override
- public void undo(long tid, Master environment) throws Exception {
- FileSystem fs =
TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration()));
- String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
- fs.delete(new Path(dir), true);
+ public void undo(long tid, Master master) throws Exception {
+ FileSystem fs = master.getFileSystem();
+ for(String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir + "/" + tableInfo.tableId));
+ }
+
}
}
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
Wed Jun 5 17:19:25 2013
@@ -34,9 +34,9 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -46,7 +46,6 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -168,8 +167,10 @@ class CleanUp extends MasterRepo {
if (refCount == 0) {
// delete the map files
try {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- fs.delete(new Path(ServerConstants.getTablesDir(), tableId), true);
+ FileSystem fs = master.getFileSystem();
+ for (String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir, tableId));
+ }
} catch (IOException e) {
log.error("Unable to remove deleted table directory", e);
}
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
Wed Jun 5 17:19:25 2013
@@ -380,11 +380,12 @@ class CreateImportDir extends MasterRepo
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
+ public Repo<Master> call(long tid, Master master) throws Exception {
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
- Path directory = new Path(ServerConstants.getTablesDir() + "/" +
tableInfo.tableId);
+ Path base = master.getFileSystem().matchingFileSystem(new
Path(tableInfo.exportDir), ServerConstants.getTablesDirs());
+ Path directory = new Path(base, tableInfo.tableId);
Path newBulkDir = new Path(directory, Constants.BULK_PREFIX +
namer.getNextName());
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Wed Jun 5 17:19:25 2013
@@ -28,8 +28,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -55,15 +53,17 @@ import org.apache.accumulo.core.util.Loc
import
org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -118,9 +118,9 @@ public class Compactor implements Callab
IteratorScope getIteratorScope();
}
- private Map<String,DataFileValue> filesToCompact;
+ private Map<FileRef,DataFileValue> filesToCompact;
private InMemoryMap imm;
- private String outputFile;
+ private FileRef outputFile;
private boolean propogateDeletes;
private TableConfiguration acuTableConf;
private CompactionEnv env;
@@ -217,9 +217,10 @@ public class Compactor implements Callab
iiList.add(new IterInfo(iterSetting.getPriority(),
iterSetting.getIteratorClass(), iterSetting.getName()));
iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
}
-
- return new ActiveCompaction(compactor.extent.toThrift(),
System.currentTimeMillis() - compactor.startTime, new ArrayList<String>(
- compactor.filesToCompact.keySet()), compactor.outputFile, type,
reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+ List<String> filesToCompact = new ArrayList<String>();
+ for (FileRef ref : compactor.filesToCompact.keySet())
+ filesToCompact.add(ref.toString());
+ return new ActiveCompaction(compactor.extent.toThrift(),
System.currentTimeMillis() - compactor.startTime, filesToCompact,
compactor.outputFile.toString(), type, reason, localityGroup, entriesRead,
entriesWritten, iiList, iterOptions);
}
}
@@ -235,7 +236,7 @@ public class Compactor implements Callab
return compactions;
}
- Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue>
files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+ Compactor(Configuration conf, FileSystem fs, Map<FileRef,DataFileValue>
files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env,
List<IteratorSetting> iterators, MajorCompactionReason reason) {
this.extent = extent;
this.conf = conf;
@@ -252,7 +253,7 @@ public class Compactor implements Callab
startTime = System.currentTimeMillis();
}
- Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue>
files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+ Compactor(Configuration conf, FileSystem fs, Map<FileRef,DataFileValue>
files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf,
extent, env, new ArrayList<IteratorSetting>(), null);
}
@@ -266,7 +267,7 @@ public class Compactor implements Callab
}
String getOutputFile() {
- return outputFile;
+ return outputFile.toString();
}
@Override
@@ -282,7 +283,8 @@ public class Compactor implements Callab
try {
FileOperations fileFactory = FileOperations.getInstance();
- mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf);
+ org.apache.hadoop.fs.FileSystem ns =
this.fs.getFileSystemByPath(outputFile.path().toString());
+ mfw = fileFactory.openWriter(outputFile.path().toString(), ns,
ns.getConf(), acuTableConf);
Map<String,Set<ByteSequence>> lGroups;
try {
@@ -314,7 +316,7 @@ public class Compactor implements Callab
// Verify the file, since hadoop 0.20.2 sometimes lies about the success
of close()
try {
- FileSKVIterator openReader = fileFactory.openReader(outputFile, false,
fs, conf, acuTableConf);
+ FileSKVIterator openReader =
fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(),
acuTableConf);
openReader.close();
} catch (IOException ex) {
log.error("Verification of successful compaction fails!!! " + extent +
" " + outputFile, ex);
@@ -324,7 +326,7 @@ public class Compactor implements Callab
log.debug(String.format("Compaction %s %,d read | %,d written | %,6d
entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() /
((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
- majCStats.setFileSize(fileFactory.getFileSize(outputFile, fs, conf,
acuTableConf));
+
majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns,
ns.getConf(), acuTableConf));
return majCStats;
} catch (IOException e) {
log.error(e, e);
@@ -343,9 +345,8 @@ public class Compactor implements Callab
try {
mfw.close();
} finally {
- Path path = new Path(outputFile);
- if (!fs.delete(path, true))
- if (fs.exists(path))
+ if (!fs.deleteRecursively(outputFile.path()))
+ if (fs.exists(outputFile.path()))
log.error("Unable to delete " + outputFile);
}
}
@@ -359,18 +360,18 @@ public class Compactor implements Callab
List<SortedKeyValueIterator<Key,Value>> iters = new
ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
- for (String mapFile : filesToCompact.keySet()) {
+ for (FileRef mapFile : filesToCompact.keySet()) {
try {
FileOperations fileFactory = FileOperations.getInstance();
-
+ org.apache.hadoop.fs.FileSystem fs =
this.fs.getFileSystemByPath(mapFile.path().toString());
FileSKVIterator reader;
- reader = fileFactory.openReader(mapFile, false, fs, conf,
acuTableConf);
+ reader = fileFactory.openReader(mapFile.path().toString(), false, fs,
conf, acuTableConf);
readers.add(reader);
- SortedKeyValueIterator<Key,Value> iter = new
ProblemReportingIterator(extent.getTableId().toString(), mapFile, false,
reader);
+ SortedKeyValueIterator<Key,Value> iter = new
ProblemReportingIterator(extent.getTableId().toString(),
mapFile.path().toString(), false, reader);
if (filesToCompact.get(mapFile).isTimeSet()) {
iter = new TimeSettingIterator(iter,
filesToCompact.get(mapFile).getTime());
@@ -380,7 +381,7 @@ public class Compactor implements Callab
} catch (Throwable e) {
- ProblemReports.getInstance().report(new
ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile,
e));
+ ProblemReports.getInstance().report(new
ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ,
mapFile.path().toString(), e));
log.warn("Some problem opening map file " + mapFile + " " +
e.getMessage(), e);
// failed to open some map file... close the ones that were opened
@@ -462,7 +463,7 @@ public class Compactor implements Callab
} catch (IOException e) {
log.error(e, e);
}
- fs.delete(new Path(outputFile), true);
+ fs.deleteRecursively(outputFile.path());
} catch (Exception e) {
log.warn("Failed to delete Canceled compaction output file " +
outputFile, e);
}
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
Wed Jun 5 17:19:25 2013
@@ -42,7 +42,9 @@ import org.apache.accumulo.core.iterator
import
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
@@ -239,8 +241,7 @@ public class FileManager {
}
private List<String> takeOpenFiles(Collection<String> files,
List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String>
readersReserved) {
- List<String> filesToOpen;
- filesToOpen = new LinkedList<String>(files);
+ List<String> filesToOpen = new LinkedList<String>(files);
for (Iterator<String> iterator = filesToOpen.iterator();
iterator.hasNext();) {
String file = iterator.next();
@@ -305,7 +306,8 @@ public class FileManager {
for (String file : filesToOpen) {
try {
// log.debug("Opening "+file);
- org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(file);
+ String path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
FileSKVIterator reader = FileOperations.getInstance().openReader(file,
false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
dataCache, indexCache);
reservedFiles.add(reader);
@@ -454,6 +456,13 @@ public class FileManager {
}
}
+ private List<FileSKVIterator> openFileRefs(Collection<FileRef> files)
throws TooManyFilesException, IOException {
+ List<String> strings = new ArrayList<String>(files.size());
+ for (FileRef ref : files)
+ strings.add(ref.path().toString());
+ return openFiles(strings);
+ }
+
private List<FileSKVIterator> openFiles(Collection<String> files) throws
TooManyFilesException, IOException {
// one tablet can not open more than maxOpen files, otherwise it could
get stuck
// forever waiting on itself to release files
@@ -469,9 +478,9 @@ public class FileManager {
return newlyReservedReaders;
}
- synchronized List<InterruptibleIterator>
openFiles(Map<String,DataFileValue> files, boolean detachable) throws
IOException {
+ synchronized List<InterruptibleIterator>
openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws
IOException {
- List<FileSKVIterator> newlyReservedReaders = openFiles(files.keySet());
+ List<FileSKVIterator> newlyReservedReaders =
openFileRefs(files.keySet());
ArrayList<InterruptibleIterator> iters = new
ArrayList<InterruptibleIterator>();
@@ -486,9 +495,9 @@ public class FileManager {
} else {
iter = new ProblemReportingIterator(tablet.getTableId().toString(),
filename, continueOnFailure, reader);
}
-
- if (files.get(filename).isTimeSet()) {
- iter = new TimeSettingIterator(iter, files.get(filename).getTime());
+ DataFileValue value = files.get(new FileRef(filename));
+ if (value.isTimeSet()) {
+ iter = new TimeSettingIterator(iter, value.getTime());
}
iters.add(iter);
Modified:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL:
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
---
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
(original)
+++
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
Wed Jun 5 17:19:25 2013
@@ -35,7 +35,8 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -43,16 +44,16 @@ public class MinorCompactor extends Comp
private static final Logger log = Logger.getLogger(MinorCompactor.class);
- private static final Map<String,DataFileValue> EMPTY_MAP =
Collections.emptyMap();
+ private static final Map<FileRef,DataFileValue> EMPTY_MAP =
Collections.emptyMap();
- private static Map<String,DataFileValue> toFileMap(String mergeFile,
DataFileValue dfv) {
+ private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile,
DataFileValue dfv) {
if (mergeFile == null)
return EMPTY_MAP;
return Collections.singletonMap(mergeFile, dfv);
}
- MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, String
mergeFile, DataFileValue dfv, String outputFile, TableConfiguration
acuTableConf,
+ MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, FileRef
mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration
acuTableConf,
KeyExtent extent, MinorCompactionReason mincReason) {
super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true,
acuTableConf, extent, new CompactionEnv() {
@@ -127,7 +128,7 @@ public class MinorCompactor extends Comp
// clean up
try {
if (getFileSystem().exists(new Path(getOutputFile()))) {
- getFileSystem().delete(new Path(getOutputFile()), true);
+ getFileSystem().deleteRecursively(new Path(getOutputFile()));
}
} catch (IOException e) {
log.warn("Failed to delete failed MinC file " + getOutputFile() + "
" + e.getMessage());