Repository: accumulo Updated Branches: refs/heads/ACCUMULO-2061 [created] 6f0528e5a
ACCUMULO-2061 Re-sync up the changes from upstream. Start fixing things where it is fit. Avoiding re-creating the old getFileSystem methods to force us to evaluate each usage to be aware of whether or not the base path matters. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6f0528e5 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6f0528e5 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6f0528e5 Branch: refs/heads/ACCUMULO-2061 Commit: 6f0528e5a63851d8de6b014d780a381ab9ed6d62 Parents: e9aafc2 Author: Josh Elser <[email protected]> Authored: Thu Feb 27 00:51:29 2014 -0500 Committer: Josh Elser <[email protected]> Committed: Thu Feb 27 00:51:29 2014 -0500 ---------------------------------------------------------------------- .../core/client/admin/TableOperationsImpl.java | 2 +- .../core/client/impl/OfflineScanner.java | 2 +- .../org/apache/accumulo/core/conf/Property.java | 2 + .../accumulo/core/file/VolumeConfiguration.java | 112 --------------- .../accumulo/core/file/rfile/PrintInfo.java | 4 +- .../core/file/rfile/bcfile/PrintInfo.java | 4 +- .../apache/accumulo/core/util/shell/Shell.java | 2 +- .../org/apache/accumulo/core/volume/Volume.java | 39 +++++ .../core/volume/VolumeConfiguration.java | 141 +++++++++++++++++++ .../apache/accumulo/core/volume/VolumeImpl.java | 57 ++++++++ .../apache/accumulo/core/zookeeper/ZooUtil.java | 4 +- .../apache/accumulo/server/ServerConstants.java | 3 +- .../accumulo/server/fs/VolumeManagerImpl.java | 52 ++++--- .../apache/accumulo/server/fs/VolumeUtil.java | 9 +- .../apache/accumulo/server/init/Initialize.java | 2 +- .../monitor/servlets/DefaultServlet.java | 2 +- .../tserver/BulkFailedCopyProcessor.java | 4 +- .../accumulo/test/functional/BulkFileIT.java | 2 +- 18 files changed, 293 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java index 0b2f10e..d89ad99 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java @@ -82,7 +82,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -109,6 +108,7 @@ import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java index c90d380..a0cf006 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java @@ -43,7 +43,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; @@ -65,6 +64,7 @@ import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index abad4ad..fa49fa0 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -102,6 +102,7 @@ public enum Property { INSTANCE_ZK_HOST("instance.zookeeper.host", "localhost:2181", PropertyType.HOSTLIST, "Comma separated list of zookeeper servers"), INSTANCE_ZK_TIMEOUT("instance.zookeeper.timeout", "30s", PropertyType.TIMEDURATION, "Zookeeper session timeout; max value when represented as milliseconds should be no larger than " + Integer.MAX_VALUE), + @Deprecated INSTANCE_DFS_URI( "instance.dfs.uri", "", @@ -110,6 +111,7 @@ public enum Property { + "will only be used when creating new files if instance.volumes is empty. After an upgrade to 1.6.0 Accumulo will start using absolute paths to " + "reference files. Files created before a 1.6.0 upgrade are referenced via relative paths. Relative paths will always be resolved using this config " + "(if empty using the hadoop config)."), + @Deprecated INSTANCE_DFS_DIR("instance.dfs.dir", "/accumulo", PropertyType.ABSOLUTEPATH, "HDFS directory in which accumulo instance will run. Do not change after accumulo is initialized."), @Sensitive http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java deleted file mode 100644 index fb8c6c8..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public class VolumeConfiguration { - - public static FileSystem getFileSystem(String path, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - if (path.contains(":")) - return new Path(path).getFileSystem(conf); - else - return getDefaultFilesystem(conf, acuconf); - } - - public static FileSystem getDefaultFilesystem(Configuration conf, AccumuloConfiguration acuconf) throws IOException { - String uri = acuconf.get(Property.INSTANCE_DFS_URI); - if ("".equals(uri)) - return FileSystem.get(conf); - else - try { - return FileSystem.get(new URI(uri), conf); - } catch (URISyntaxException e) { - throw new IOException(e); - } - } - - public static String getConfiguredBaseDir(AccumuloConfiguration conf) { - String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); - String dfsUri = conf.get(Property.INSTANCE_DFS_URI); - String baseDir; - - if (dfsUri == null || dfsUri.isEmpty()) { - Configuration hadoopConfig = CachedConfiguration.getInstance(); - try { - baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace; - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - if (!dfsUri.contains(":")) - throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri); - baseDir = dfsUri + singleNamespace; - } - return baseDir; - } - - public static String[] getConfiguredBaseDirs(AccumuloConfiguration conf) { - String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); - String ns = conf.get(Property.INSTANCE_VOLUMES); - - String configuredBaseDirs[]; - - if (ns == null || ns.isEmpty()) { - configuredBaseDirs = new String[] {getConfiguredBaseDir(conf)}; - } else { - String namespaces[] = ns.split(","); - String unescapedNamespaces[] = new String[namespaces.length]; - int i = 0; - for (String namespace : namespaces) { - if (!namespace.contains(":")) { - throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace); - } - - try { - // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char) - unescapedNamespaces[i++] = new Path(new URI(namespace)).toString(); - } catch (URISyntaxException e) { - throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains " + namespace + " which has a syntax error", e); - } - } - - configuredBaseDirs = prefix(unescapedNamespaces, singleNamespace); - } - - return configuredBaseDirs; - } - - public static String[] prefix(String bases[], String suffix) { - if (suffix.startsWith("/")) - suffix = suffix.substring(1); - String result[] = new String[bases.length]; - for (int i = 0; i < bases.length; i++) { - result[i] = bases[i] + "/" + suffix; - } - return result; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 4cfefad..4e39fc7 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -25,9 +25,9 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,7 +50,7 @@ public class PrintInfo { @SuppressWarnings("deprecation") AccumuloConfiguration aconf = AccumuloConfiguration.getSiteConfiguration(); - FileSystem hadoopFs = VolumeConfiguration.getDefaultFilesystem(conf, aconf); + FileSystem hadoopFs = VolumeConfiguration.getDefaultVolume(conf, aconf).getFileSystem(); FileSystem localFs = FileSystem.getLocal(conf); Opts opts = new Opts(); opts.parseArgs(PrintInfo.class.getName(), args); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java index f21190e..e2c8a6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java @@ -22,8 +22,8 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -57,7 +57,7 @@ public class PrintInfo { Configuration conf = new Configuration(); @SuppressWarnings("deprecation") AccumuloConfiguration siteConf = AccumuloConfiguration.getSiteConfiguration(); - FileSystem hadoopFs = VolumeConfiguration.getDefaultFilesystem(conf, siteConf); + FileSystem hadoopFs = VolumeConfiguration.getDefaultVolume(conf, siteConf).getFileSystem(); FileSystem localFs = FileSystem.getLocal(conf); Path path = new Path(args[0]); FileSystem fs; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java b/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java index 850816c..930d5ac 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java +++ b/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java @@ -63,7 +63,6 @@ import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.BadArgumentException; @@ -156,6 +155,7 @@ import org.apache.accumulo.core.util.shell.commands.UserCommand; import org.apache.accumulo.core.util.shell.commands.UserPermissionsCommand; import org.apache.accumulo.core.util.shell.commands.UsersCommand; import org.apache.accumulo.core.util.shell.commands.WhoAmICommand; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.commons.cli.BasicParser; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/volume/Volume.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/volume/Volume.java b/core/src/main/java/org/apache/accumulo/core/volume/Volume.java new file mode 100644 index 0000000..2f97c83 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/volume/Volume.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.volume; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Encapsulates a {@link FileSystem} and a base {@link Path} within that filesystem. This + * also avoid the necessity to pass around a Configuration. + */ +public interface Volume { + + /** + * A {@link FileSystem} that Accumulo will use + * @return + */ + public FileSystem getFileSystem(); + + /** + * The base {@link Path} which Accumulo will use within the given {@link FileSystem} + * @return + */ + public Path getBasePath(); +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java b/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java new file mode 100644 index 0000000..b8663df --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.volume; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +public class VolumeConfiguration { + + public static Volume getVolume(String path, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + if (path.contains(":")) { + Path basePath = new Path(path); + return create(basePath.getFileSystem(conf), basePath); + } else { + return getDefaultVolume(conf, acuconf); + } + } + + public static Volume getDefaultVolume(Configuration conf, AccumuloConfiguration acuconf) throws IOException { + @SuppressWarnings("deprecation") + String uri = acuconf.get(Property.INSTANCE_DFS_URI); + if ("".equals(uri)) + return create(FileSystem.get(conf), acuconf); + else + try { + return create(FileSystem.get(new URI(uri), conf), acuconf); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + public static String getConfiguredBaseDir(AccumuloConfiguration conf) { + String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); + String dfsUri = conf.get(Property.INSTANCE_DFS_URI); + String baseDir; + + if (dfsUri == null || dfsUri.isEmpty()) { + Configuration hadoopConfig = CachedConfiguration.getInstance(); + try { + baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace; + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + if (!dfsUri.contains(":")) + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri); + baseDir = dfsUri + singleNamespace; + } + return baseDir; + } + + public static String[] getConfiguredBaseDirs(AccumuloConfiguration conf) { + String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); + String ns = conf.get(Property.INSTANCE_VOLUMES); + + String configuredBaseDirs[]; + + if (ns == null || ns.isEmpty()) { + configuredBaseDirs = new String[] {getConfiguredBaseDir(conf)}; + } else { + String namespaces[] = ns.split(","); + String unescapedNamespaces[] = new String[namespaces.length]; + int i = 0; + for (String namespace : namespaces) { + if (!namespace.contains(":")) { + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace); + } + + try { + // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char) + unescapedNamespaces[i++] = new Path(new URI(namespace)).toString(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains " + namespace + " which has a syntax error", e); + } + } + + configuredBaseDirs = prefix(unescapedNamespaces, singleNamespace); + } + + return configuredBaseDirs; + } + + public static String[] prefix(String bases[], String suffix) { + if (suffix.startsWith("/")) + suffix = suffix.substring(1); + String result[] = new String[bases.length]; + for (int i = 0; i < bases.length; i++) { + result[i] = bases[i] + "/" + suffix; + } + return result; + } + + /** + * Create a Volume with the given FileSystem that writes to the default path + * @param fs A FileSystem to write to + * @return A Volume instance writing to the given FileSystem in the default path + */ + @SuppressWarnings("deprecation") + public static <T extends FileSystem> Volume create(T fs, AccumuloConfiguration acuconf) { + return new VolumeImpl(fs, new Path(acuconf.get(Property.INSTANCE_DFS_DIR))); + } + + public static <T extends FileSystem> Volume create(T fs, Path basePath) { + return new VolumeImpl(fs, basePath); + } + + public static Volume create(String path, Configuration conf) throws IOException { + Preconditions.checkNotNull(path); + return create(new Path(path), conf); + } + + public static Volume create(Path path, Configuration conf) throws IOException { + Preconditions.checkNotNull(conf); + return new VolumeImpl(path.getFileSystem(conf), path); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java b/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java new file mode 100644 index 0000000..6821955 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.volume; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * + */ +public class VolumeImpl implements Volume { + + protected FileSystem fs; + protected Path basePath; + + public VolumeImpl(FileSystem fs, Path basePath) { + checkNotNull(fs); + checkNotNull(basePath); + + this.fs = fs; + this.basePath = basePath; + } + + /* (non-javadoc) + * @see org.apache.accumulo.server.fs.Volume#getFileSystem() + */ + @Override + public FileSystem getFileSystem() { + return fs; + } + + /* (non-javadoc) + * @see org.apache.accumulo.server.fs.Volume#getBasePath() + */ + @Override + public Path getBasePath() { + return basePath; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java index de1b432..d536f42 100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java @@ -23,8 +23,8 @@ import java.net.UnknownHostException; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +49,7 @@ public class ZooUtil extends org.apache.accumulo.fate.zookeeper.ZooUtil { public static String getInstanceIDFromHdfs(Path instanceDirectory, AccumuloConfiguration conf) { try { - FileSystem fs = VolumeConfiguration.getFileSystem(instanceDirectory.toString(), CachedConfiguration.getInstance(), conf); + FileSystem fs = VolumeConfiguration.getVolume(instanceDirectory.toString(), CachedConfiguration.getInstance(), conf).getFileSystem(); FileStatus[] files = null; try { files = fs.listStatus(instanceDirectory); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java index 8983d08..b1e79cb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java @@ -24,10 +24,10 @@ import java.util.HashSet; import java.util.List; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeUtil; @@ -56,6 +56,7 @@ public class ServerConstants { private static List<Pair<Path,Path>> replacementsList = null; + @SuppressWarnings("deprecation") public static synchronized String getDefaultBaseDir() { if (defaultBaseDir == null) { defaultBaseDir = new Path(VolumeConfiguration.getConfiguredBaseDir(ServerConfiguration.getSiteConfiguration())).toString(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 80301ef..9920159 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -34,8 +34,9 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -58,12 +59,12 @@ public class VolumeManagerImpl implements VolumeManager { private static final Logger log = Logger.getLogger(VolumeManagerImpl.class); - Map<String,? extends FileSystem> volumes; - FileSystem defaultVolume; + Map<String,Volume> volumes; + Volume defaultVolume; AccumuloConfiguration conf; VolumeChooser chooser; - protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) { + protected VolumeManagerImpl(Map<String,Volume> volumes, String defaultVolume, AccumuloConfiguration conf) { this.volumes = volumes; this.defaultVolume = volumes.get(defaultVolume); this.conf = conf; @@ -72,16 +73,18 @@ public class VolumeManagerImpl implements VolumeManager { } public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException { - return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "", - DefaultConfiguration.getDefaultConfiguration()); + AccumuloConfiguration accConf = DefaultConfiguration.getDefaultConfiguration(); + Volume defaultLocalVolume = VolumeConfiguration.create(FileSystem.getLocal(CachedConfiguration.getInstance()), accConf); + + return new VolumeManagerImpl(Collections.singletonMap("", defaultLocalVolume), "", accConf); } @Override public void close() throws IOException { IOException ex = null; - for (FileSystem fs : volumes.values()) { + for (Volume volume : volumes.values()) { try { - fs.close(); + volume.getFileSystem().close(); } catch (IOException e) { ex = e; } @@ -183,9 +186,9 @@ public class VolumeManagerImpl implements VolumeManager { } protected void ensureSyncIsEnabled() { - for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) { + for (Entry<String,Volume> entry : getFileSystems().entrySet()) { final String volumeName = entry.getKey(); - FileSystem fs = entry.getValue(); + FileSystem fs = entry.getValue().getFileSystem(); if (ViewFSUtils.isViewFS(fs)) { try { @@ -273,10 +276,10 @@ public class VolumeManagerImpl implements VolumeManager { } } - return defaultVolume; + return defaultVolume.getFileSystem(); } - private Map<String,? extends FileSystem> getFileSystems() { + private Map<String,Volume> getFileSystems() { return volumes; } @@ -347,26 +350,31 @@ public class VolumeManagerImpl implements VolumeManager { static private final String DEFAULT = ""; public static VolumeManager get(AccumuloConfiguration conf) throws IOException { - Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>(); - Configuration hadoopConf = CachedConfiguration.getInstance(); - fileSystems.put(DEFAULT, VolumeConfiguration.getDefaultFilesystem(hadoopConf, conf)); - for (String space : VolumeConfiguration.getConfiguredBaseDirs(conf)) { - if (space.equals(DEFAULT)) + final Map<String,Volume> volumes = new HashMap<String,Volume>(); + final Configuration hadoopConf = CachedConfiguration.getInstance(); + + // The "default" Volume for Accumulo (in case no volumes are specified) + volumes.put(DEFAULT, VolumeConfiguration.getDefaultVolume(hadoopConf, conf)); + for (String volumeUriOrDir : VolumeConfiguration.getConfiguredBaseDirs(conf)) { + if (volumeUriOrDir.equals(DEFAULT)) + // Cannot re-define the default volume throw new IllegalArgumentException(); - if (space.contains(":")) { - fileSystems.put(space, new Path(space).getFileSystem(hadoopConf)); + // We require a URI here, fail if it doesn't look like one + if (volumeUriOrDir.contains(":")) { + volumes.put(volumeUriOrDir, VolumeConfiguration.create(new Path(volumeUriOrDir), hadoopConf)); } else { - throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + space); + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + volumeUriOrDir); } } - return new VolumeManagerImpl(fileSystems, DEFAULT, conf); + return new VolumeManagerImpl(volumes, DEFAULT, conf); } @Override public boolean isReady() throws IOException { - for (FileSystem fs : getFileSystems().values()) { + for (Volume volume : getFileSystems().values()) { + FileSystem fs = volume.getFileSystem(); if (ViewFSUtils.isViewFS(fs)) { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index da3baa6..42ea912 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -26,17 +26,21 @@ import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,6 +49,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import com.google.common.base.Preconditions; + /** * Utility methods for managing absolute URIs contained in Accumulo metadata. */ @@ -52,7 +58,7 @@ import org.apache.log4j.Logger; public class VolumeUtil { private static final Logger log = Logger.getLogger(VolumeUtil.class); - + private static boolean isActiveVolume(Path dir) { // consider relative path as active and take no action @@ -335,4 +341,5 @@ public class VolumeUtil { SecureRandom rand = new SecureRandom(); return new Path(path.getParent(), path.getName() + "_" + System.currentTimeMillis() + "_" + Math.abs(rand.nextInt()) + ".bak"); } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 925f602..3dc3665 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -40,7 +40,6 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.master.thrift.MasterGoalState; @@ -52,6 +51,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.security.SecurityUtil; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java index 942f866..369949f 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java @@ -33,11 +33,11 @@ import javax.servlet.http.HttpServletResponse; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.DefaultConfiguration; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; import org.apache.accumulo.core.util.Duration; import org.apache.accumulo.core.util.NumUtil; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.ZooKeeperStatus; import org.apache.accumulo.monitor.ZooKeeperStatus.ZooKeeperState; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index e9f1083..5c64970 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java @@ -50,7 +50,7 @@ public class BulkFailedCopyProcessor implements Processor { Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp"); try { - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.VolumeConfiguration.getDefaultFilesystem(CachedConfiguration.getInstance(), + FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.volume.VolumeConfiguration.getDefaultFilesystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration())); FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance()); @@ -58,7 +58,7 @@ public class BulkFailedCopyProcessor implements Processor { log.debug("copied " + orig + " to " + dest); } catch (IOException ex) { try { - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.VolumeConfiguration.getDefaultFilesystem(CachedConfiguration.getInstance(), + FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.volume.VolumeConfiguration.getDefaultFilesystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration())); fs.create(dest).close(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6f0528e5/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java index c8023c0..5df7b68 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java @@ -29,9 +29,9 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.hadoop.conf.Configuration;
