WIP
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/63037dbb Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/63037dbb Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/63037dbb Branch: refs/heads/ACCUMULO-2061 Commit: 63037dbbe3bd9a796d016f31fd27b256d4a2bdbe Parents: e8a978c Author: Josh Elser <[email protected]> Authored: Thu Mar 6 11:08:48 2014 -0500 Committer: Josh Elser <[email protected]> Committed: Fri Mar 7 12:26:20 2014 -0500 ---------------------------------------------------------------------- .../core/client/impl/OfflineScanner.java | 2 +- .../org/apache/accumulo/core/volume/Volume.java | 5 ++- .../core/volume/VolumeConfiguration.java | 27 +++++++------- .../apache/accumulo/core/volume/VolumeImpl.java | 39 ++++++++++++++++---- .../apache/accumulo/core/zookeeper/ZooUtil.java | 2 +- .../apache/accumulo/server/ServerConstants.java | 10 ----- .../accumulo/server/fs/VolumeManager.java | 2 +- .../accumulo/server/fs/VolumeManagerImpl.java | 23 +++++++++--- 8 files changed, 69 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java index a0cf006..20228d2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java @@ -306,7 +306,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> { // TODO need to close files - ACCUMULO-1303 for (String file : absFiles) { - FileSystem fs = VolumeConfiguration.getFileSystem(file, conf, config); + FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem(); FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null); readers.add(reader); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/core/src/main/java/org/apache/accumulo/core/volume/Volume.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/volume/Volume.java b/core/src/main/java/org/apache/accumulo/core/volume/Volume.java index 2f97c83..3726518 100644 --- a/core/src/main/java/org/apache/accumulo/core/volume/Volume.java +++ b/core/src/main/java/org/apache/accumulo/core/volume/Volume.java @@ -32,8 +32,9 @@ public interface Volume { public FileSystem getFileSystem(); /** - * The base {@link Path} which Accumulo will use within the given {@link FileSystem} + * The base path which Accumulo will use within the given {@link FileSystem} * @return */ - public Path getBasePath(); + public String getBasePath(); + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java b/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java index b8663df..59f3da7 100644 --- a/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java @@ -30,12 +30,15 @@ import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; public class VolumeConfiguration { - + public static Volume getVolume(String path, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + Preconditions.checkNotNull(path); + if (path.contains(":")) { - Path basePath = new Path(path); - return create(basePath.getFileSystem(conf), basePath); + // An absolute path + return create(new Path(path), conf); } else { + // A relative path return getDefaultVolume(conf, acuconf); } } @@ -52,7 +55,8 @@ public class VolumeConfiguration { throw new IOException(e); } } - + + @Deprecated public static String getConfiguredBaseDir(AccumuloConfiguration conf) { String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); String dfsUri = conf.get(Property.INSTANCE_DFS_URI); @@ -74,16 +78,16 @@ public class VolumeConfiguration { } public static String[] getConfiguredBaseDirs(AccumuloConfiguration conf) { - String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); String ns = conf.get(Property.INSTANCE_VOLUMES); String configuredBaseDirs[]; if (ns == null || ns.isEmpty()) { + // Fall back to using the old config values configuredBaseDirs = new String[] {getConfiguredBaseDir(conf)}; } else { String namespaces[] = ns.split(","); - String unescapedNamespaces[] = new String[namespaces.length]; + configuredBaseDirs = new String[namespaces.length]; int i = 0; for (String namespace : namespaces) { if (!namespace.contains(":")) { @@ -92,13 +96,11 @@ public class VolumeConfiguration { try { // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char) - unescapedNamespaces[i++] = new Path(new URI(namespace)).toString(); + configuredBaseDirs[i++] = new Path(new URI(namespace)).toString(); } catch (URISyntaxException e) { throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains " + namespace + " which has a syntax error", e); } } - - configuredBaseDirs = prefix(unescapedNamespaces, singleNamespace); } return configuredBaseDirs; @@ -121,10 +123,10 @@ public class VolumeConfiguration { */ @SuppressWarnings("deprecation") public static <T extends FileSystem> Volume create(T fs, AccumuloConfiguration acuconf) { - return new VolumeImpl(fs, new Path(acuconf.get(Property.INSTANCE_DFS_DIR))); + return new VolumeImpl(fs, acuconf.get(Property.INSTANCE_DFS_DIR)); } - public static <T extends FileSystem> Volume create(T fs, Path basePath) { + public static <T extends FileSystem> Volume create(T fs, String basePath) { return new VolumeImpl(fs, basePath); } @@ -134,8 +136,7 @@ public class VolumeConfiguration { } public static Volume create(Path path, Configuration conf) throws IOException { - Preconditions.checkNotNull(conf); - return new VolumeImpl(path.getFileSystem(conf), path); + return new VolumeImpl(path, conf); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java b/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java index 6821955..bbdc166 100644 --- a/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java @@ -18,6 +18,9 @@ package org.apache.accumulo.core.volume; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,11 +29,18 @@ import org.apache.hadoop.fs.Path; * */ public class VolumeImpl implements Volume { - - protected FileSystem fs; - protected Path basePath; + protected final FileSystem fs; + protected final String basePath; + + public VolumeImpl(Path path, Configuration conf) throws IOException { + checkNotNull(path); + checkNotNull(conf); + + this.fs = path.getFileSystem(conf); + this.basePath = path.toUri().getPath(); + } - public VolumeImpl(FileSystem fs, Path basePath) { + public VolumeImpl(FileSystem fs, String basePath) { checkNotNull(fs); checkNotNull(basePath); @@ -39,7 +49,7 @@ public class VolumeImpl implements Volume { } /* (non-javadoc) - * @see org.apache.accumulo.server.fs.Volume#getFileSystem() + * @see org.apache.accumulo.core.volume.Volume#getFileSystem() */ @Override public FileSystem getFileSystem() { @@ -47,11 +57,26 @@ public class VolumeImpl implements Volume { } /* (non-javadoc) - * @see org.apache.accumulo.server.fs.Volume#getBasePath() + * @see org.apache.accumulo.core.volume.Volume#getBasePath() */ @Override - public Path getBasePath() { + public String getBasePath() { return basePath; } + + @Override + public boolean equals(Object o) { + if (o instanceof VolumeImpl) { + VolumeImpl other = (VolumeImpl) o; + return getFileSystem().equals(other.getFileSystem()) && getBasePath().equals(other.getBasePath()); + } + + return false; + } + + @Override + public String toString() { + return getFileSystem() + " " + basePath; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java index d536f42..97518c5 100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java @@ -49,7 +49,7 @@ public class ZooUtil extends org.apache.accumulo.fate.zookeeper.ZooUtil { public static String getInstanceIDFromHdfs(Path instanceDirectory, AccumuloConfiguration conf) { try { - FileSystem fs = VolumeConfiguration.getVolume(instanceDirectory.toString(), CachedConfiguration.getInstance(), conf).getFileSystem(); + FileSystem fs = VolumeConfiguration.getFileSystem(instanceDirectory.toString(), CachedConfiguration.getInstance(), conf); FileStatus[] files = null; try { files = fs.listStatus(instanceDirectory); http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java index b1e79cb..ac1ca7e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java @@ -52,19 +52,9 @@ public class ServerConstants { public static final int PREV_DATA_VERSION = 5; private static String[] baseDirs = null; - private static String defaultBaseDir = null; private static List<Pair<Path,Path>> replacementsList = null; - @SuppressWarnings("deprecation") - public static synchronized String getDefaultBaseDir() { - if (defaultBaseDir == null) { - defaultBaseDir = new Path(VolumeConfiguration.getConfiguredBaseDir(ServerConfiguration.getSiteConfiguration())).toString(); - } - - return defaultBaseDir; - } - // these are functions to delay loading the Accumulo configuration unless we must public static synchronized String[] getBaseDirs() { if (baseDirs == null) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index f0c7083..04632b9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -110,7 +110,7 @@ public interface VolumeManager { FileStatus getFileStatus(Path path) throws IOException; // find the appropriate FileSystem object given a path - FileSystem getFileSystemByPath(Path path); + Volume getVolumeByPath(Path path); // return the item in options that is in the same volume as source Path matchingFileSystem(Path source, String[] options); http://git-wip-us.apache.org/repos/asf/accumulo/blob/63037dbb/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 9920159..e1f6aa3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -55,23 +55,34 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; +import com.google.common.collect.Maps; + public class VolumeManagerImpl implements VolumeManager { private static final Logger log = Logger.getLogger(VolumeManagerImpl.class); - Map<String,Volume> volumes; + Map<String,Volume> volumesByName; + Map<FileSystem,Volume> volumesByFileSystem; Volume defaultVolume; AccumuloConfiguration conf; VolumeChooser chooser; protected VolumeManagerImpl(Map<String,Volume> volumes, String defaultVolume, AccumuloConfiguration conf) { - this.volumes = volumes; + this.volumesByName = volumes; this.defaultVolume = volumes.get(defaultVolume); + this.volumesByFileSystem = Maps.newHashMapWithExpectedSize(volumesByName.size()); + invertVolumesByFileSystem(volumesByName, volumesByFileSystem); this.conf = conf; ensureSyncIsEnabled(); chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser()); } + private void invertVolumesByFileSystem(Map<String,Volume> forward, Map<FileSystem,Volume> inverted) { + for (Volume volume : forward.values()) { + inverted.put(volume.getFileSystem(), volume); + } + } + public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException { AccumuloConfiguration accConf = DefaultConfiguration.getDefaultConfiguration(); Volume defaultLocalVolume = VolumeConfiguration.create(FileSystem.getLocal(CachedConfiguration.getInstance()), accConf); @@ -82,7 +93,7 @@ public class VolumeManagerImpl implements VolumeManager { @Override public void close() throws IOException { IOException ex = null; - for (Volume volume : volumes.values()) { + for (Volume volume : volumesByName.values()) { try { volume.getFileSystem().close(); } catch (IOException e) { @@ -267,7 +278,7 @@ public class VolumeManagerImpl implements VolumeManager { } @Override - public FileSystem getFileSystemByPath(Path path) { + public Volume getVolumeByPath(Path path) { if (path.toString().contains(":")) { try { return path.getFileSystem(CachedConfiguration.getInstance()); @@ -276,11 +287,11 @@ public class VolumeManagerImpl implements VolumeManager { } } - return defaultVolume.getFileSystem(); + return defaultVolume; } private Map<String,Volume> getFileSystems() { - return volumes; + return volumesByName; } @Override
