ACCUMULO-3177 Create a per table volume chooser Signed-off-by: Christopher Tubbs <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3f44f8c1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3f44f8c1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3f44f8c1 Branch: refs/heads/master Commit: 3f44f8c191941ecb57656eaa5ddd4177c71cbfe0 Parents: 8031dcd Author: Jenna Huston <[email protected]> Authored: Mon Oct 27 14:06:32 2014 -0400 Committer: Christopher Tubbs <[email protected]> Committed: Fri Dec 5 20:04:22 2014 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 4 +- .../server/fs/PerTableVolumeChooser.java | 42 ++++++++++++++++++++ .../accumulo/server/fs/RandomVolumeChooser.java | 7 ++-- .../accumulo/server/fs/VolumeChooser.java | 3 +- .../server/fs/VolumeChooserEnvironment.java | 37 +++++++++++++++++ .../accumulo/server/fs/VolumeManager.java | 4 +- .../accumulo/server/fs/VolumeManagerImpl.java | 5 ++- .../apache/accumulo/server/fs/VolumeUtil.java | 4 +- .../apache/accumulo/server/init/Initialize.java | 37 ++++++++--------- .../apache/accumulo/server/util/FileUtil.java | 6 ++- .../accumulo/server/util/MetadataTableUtil.java | 6 ++- .../accumulo/server/util/RandomizeVolumes.java | 4 +- .../accumulo/server/util/TabletOperations.java | 4 +- .../java/org/apache/accumulo/master/Master.java | 3 +- .../accumulo/master/TabletGroupWatcher.java | 3 +- .../accumulo/master/tableOps/CreateTable.java | 4 +- .../accumulo/master/tableOps/ImportTable.java | 4 +- .../master/tableOps/ImportTableTest.java | 4 +- .../apache/accumulo/tserver/log/DfsLogger.java | 4 +- .../tserver/TabletServerSyncCheckTest.java | 3 +- .../apache/accumulo/test/FairVolumeChooser.java | 3 +- 21 files changed, 145 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index cc7d548..4c2d0b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -165,7 +165,7 @@ public enum Property { GENERAL_SIMPLETIMER_THREADPOOL_SIZE("general.server.simpletimer.threadpool.size", "1", PropertyType.COUNT, "The number of threads to use for " + "server-internal scheduled tasks"), @Experimental - GENERAL_VOLUME_CHOOSER("general.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME, + GENERAL_VOLUME_CHOOSER("general.volume.chooser", "org.apache.accumulo.server.fs.PerTableVolumeChooser", PropertyType.CLASSNAME, "The class that will be used to select which volume will be used to create new files."), GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS("general.security.credential.provider.paths", "", PropertyType.STRING, "Comma-separated list of paths to CredentialProviders"), GENERAL_LEGACY_METRICS("general.legacy.metrics", "false", PropertyType.BOOLEAN, @@ -467,6 +467,8 @@ public enum Property { TABLE_REPLICATION_TARGET("table.replication.target.", null, PropertyType.PREFIX, "Enumerate a mapping of other systems which this table should " + "replicate their data to. The key suffix is the identifying cluster name and the value is an identifier for a location on the target system, " + "e.g. the ID of the table on the target to replicate to"), + TABLE_VOLUME_CHOOSER("table.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME, + "The class that will be used to select which volume will be used to create new files for this table."), // VFS ClassLoader properties VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "", PropertyType.STRING, http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java new file mode 100644 index 0000000..7a825c7 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.fs; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.conf.TableConfiguration; + +public class PerTableVolumeChooser implements VolumeChooser { + + private static final VolumeChooser fallbackVolumeChooser = new RandomVolumeChooser(); + + public PerTableVolumeChooser() {} + + @Override + public String choose(VolumeChooserEnvironment env, String[] options) { + VolumeChooser chooser; + if (env.hasTableId()) { + TableConfiguration conf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getTableConfiguration(env.getTableId()); + chooser = Property.createTableInstanceFromPropertyName(conf, Property.TABLE_VOLUME_CHOOSER, VolumeChooser.class, fallbackVolumeChooser); + } else { + chooser = fallbackVolumeChooser; + } + + return chooser.choose(env, options); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java index 2760b07..85d4e2b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java @@ -19,11 +19,10 @@ package org.apache.accumulo.server.fs; import java.util.Random; public class RandomVolumeChooser implements VolumeChooser { - Random random = new Random(); - + private static Random random = new Random(); + @Override - public String choose(String[] options) { + public String choose(VolumeChooserEnvironment env, String[] options) { return options[random.nextInt(options.length)]; } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java index 8713c97..f523057 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.server.fs; - public interface VolumeChooser { - String choose(String[] options); + String choose(VolumeChooserEnvironment env, String[] options); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooserEnvironment.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooserEnvironment.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooserEnvironment.java new file mode 100644 index 0000000..b6d27cb --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooserEnvironment.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.fs; + +import com.google.common.base.Optional; + +public class VolumeChooserEnvironment { + + private final Optional<String> tableId; + + public VolumeChooserEnvironment(Optional<String> tableId) { + this.tableId = tableId; + } + + public boolean hasTableId() { + return tableId.isPresent(); + } + + public String getTableId() { + return tableId.get(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index cbfdb5e..890651e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import com.google.common.base.Optional; + /** * A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes. This also concentrates a bunch of meta-operations like * waiting for SAFE_MODE, and closing WALs. @@ -156,7 +158,7 @@ public interface VolumeManager { ContentSummary getContentSummary(Path dir) throws IOException; // decide on which of the given locations to create a new file - String choose(String[] options); + String choose(Optional<String> tableId, String[] options); /** * Fetch the default Volume http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 37d5088..dc1be73 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; +import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -572,8 +573,8 @@ public class VolumeManagerImpl implements VolumeManager { } @Override - public String choose(String[] options) { - return chooser.choose(options); + public String choose(Optional<String> tableId, String[] options) { + return chooser.choose(new VolumeChooserEnvironment(tableId), options); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 6ebbe1e..877d01c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -49,6 +49,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import com.google.common.base.Optional; + /** * Utility methods for managing absolute URIs contained in Accumulo metadata. */ @@ -263,7 +265,7 @@ public class VolumeUtil { throw new IllegalArgumentException("Unexpected table dir " + dir); } - Path newDir = new Path(vm.choose(ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + dir.getParent().getName() + Path newDir = new Path(vm.choose(Optional.of(extent.getTableId().toString()), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + dir.getParent().getName() + Path.SEPARATOR + dir.getName()); log.info("Updating directory for " + extent + " from " + dir + " to " + newDir); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 670c541..28bd63b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -106,6 +106,7 @@ import org.apache.zookeeper.ZooDefs.Ids; import com.beust.jcommander.Parameter; import com.google.common.base.Joiner; +import com.google.common.base.Optional; /** * This class is used to setup the directory structure and the root tablet to get an instance started @@ -282,13 +283,13 @@ public class Initialize { return initialize(opts, instanceNamePath, fs); } - public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) { + private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) { UUID uuid = UUID.randomUUID(); // the actual disk locations of the root table and tablets String[] configuredVolumes = VolumeConfiguration.getVolumeUris(SiteConfiguration.getInstance()); - final String rootTabletDir = new Path(fs.choose(configuredVolumes) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + RootTable.ID - + RootTable.ROOT_TABLET_LOCATION).toString(); + final String rootTabletDir = new Path(fs.choose(Optional.<String> absent(), configuredVolumes) + Path.SEPARATOR + ServerConstants.TABLE_DIR + + Path.SEPARATOR + RootTable.ID + RootTable.ROOT_TABLET_LOCATION).toString(); try { initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTabletDir); @@ -319,7 +320,8 @@ public class Initialize { } try { - initSecurity(opts, uuid.toString()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())); + initSecurity(context, opts, uuid.toString()); } catch (Exception e) { log.fatal("Failed to initialize security", e); return false; @@ -356,13 +358,12 @@ public class Initialize { // initialize initial system tables config in zookeeper initSystemTablesConfig(); - String tableMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID - + TABLE_TABLETS_TABLET_DIR; - String replicationTableDefaultTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID - + Constants.DEFAULT_TABLET_LOCATION; - - String defaultMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID - + Constants.DEFAULT_TABLET_LOCATION; + String tableMetadataTabletDir = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + MetadataTable.ID + TABLE_TABLETS_TABLET_DIR; + String replicationTableDefaultTabletDir = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + ReplicationTable.ID + Constants.DEFAULT_TABLET_LOCATION; + String defaultMetadataTabletDir = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + MetadataTable.ID + Constants.DEFAULT_TABLET_LOCATION; // create table and default tablets directories createDirectories(fs, rootTabletDir, tableMetadataTabletDir, defaultMetadataTabletDir, replicationTableDefaultTabletDir); @@ -546,15 +547,8 @@ public class Initialize { return rootpass.getBytes(UTF_8); } - private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException { - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()) { - @Override - public synchronized AccumuloConfiguration getConfiguration() { - return getSiteConfiguration(); - } - }); - AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), DEFAULT_ROOT_USER, - opts.rootpass); + private static void initSecurity(AccumuloServerContext context, Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException { + AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), DEFAULT_ROOT_USER, opts.rootpass); } public static void initSystemTablesConfig() throws IOException { @@ -659,9 +653,10 @@ public class Initialize { VolumeManager fs = VolumeManagerImpl.get(acuConf); if (opts.resetSecurity) { + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())); if (isInitialized(fs)) { opts.rootpass = getRootPassword(opts); - initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID()); + initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID()); } else { log.fatal("Attempted to reset security on accumulo before it was initialized"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java index aa37e35..103ba05 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java @@ -56,6 +56,8 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; +import com.google.common.base.Optional; + public class FileUtil { public static class FileInfo { @@ -79,8 +81,8 @@ public class FileUtil { private static final Logger log = Logger.getLogger(FileUtil.class); private static Path createTmpDir(AccumuloConfiguration acuConf, VolumeManager fs) throws IOException { - String accumuloDir = fs.choose(ServerConstants.getBaseUris()); - + String accumuloDir = fs.choose(Optional.<String>absent(), ServerConstants.getBaseUris()); + Path result = null; while (result == null) { result = new Path(accumuloDir + Path.SEPARATOR + "tmp/idxReduce_" + String.format("%09d", new Random().nextInt(Integer.MAX_VALUE))); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index dd3355a..524abb0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -95,6 +95,8 @@ import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; +import com.google.common.base.Optional; + /** * provides a reference to the metadata table for updates by tablet servers */ @@ -889,7 +891,7 @@ public class MetadataTableUtil { Key k = entry.getKey(); Mutation m = new Mutation(k.getRow()); m.putDelete(k.getColumnFamily(), k.getColumnQualifier()); - String dir = volumeManager.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId + Path.SEPARATOR + String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES)); TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8))); @@ -981,7 +983,7 @@ public class MetadataTableUtil { * During an upgrade from 1.6 to 1.7, we need to add the replication table */ public static void createReplicationTable(ClientContext context) throws IOException { - String dir = VolumeManagerImpl.get().choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID + String dir = VolumeManagerImpl.get().choose(Optional.of(ReplicationTable.ID), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID + Constants.DEFAULT_TABLET_LOCATION; Mutation m = new Mutation(new Text(KeyExtent.getMetadataEntry(new Text(ReplicationTable.ID), null))); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java b/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java index 82cc855..de360fe 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java @@ -47,6 +47,8 @@ import org.apache.accumulo.server.tables.TableManager; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; +import com.google.common.base.Optional; + public class RandomizeVolumes { private static final Logger log = Logger.getLogger(RandomizeVolumes.class); @@ -110,7 +112,7 @@ public class RandomizeVolumes { Key key = entry.getKey(); Mutation m = new Mutation(key.getRow()); - final String newLocation = vm.choose(ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + tableId + Path.SEPARATOR + directory; + final String newLocation = vm.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + tableId + Path.SEPARATOR + directory; m.put(key.getColumnFamily(), key.getColumnQualifier(), new Value(newLocation.getBytes(UTF_8))); if (log.isTraceEnabled()) { log.trace("Replacing " + oldLocation + " with " + newLocation); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java index 2c9fe9c..c0e1a9b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import com.google.common.base.Optional; + public class TabletOperations { private static final Logger log = Logger.getLogger(TabletOperations.class); @@ -38,7 +40,7 @@ public class TabletOperations { String lowDirectory; UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); - String volume = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR; + String volume = fs.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR; while (true) { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 6e81354..de00041 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -147,6 +147,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; /** @@ -254,7 +255,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List if (!zoo.exists(dirZPath)) { Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + "/root_tablet"); if (fs.exists(oldPath)) { - String newPath = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + RootTable.ID; + String newPath = fs.choose(Optional.of(RootTable.ID), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + RootTable.ID; fs.mkdirs(new Path(newPath)); if (!fs.rename(oldPath, new Path(newPath))) { throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 3d39891..93ed423 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -83,6 +83,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; +import com.google.common.base.Optional; import com.google.common.collect.Iterators; class TabletGroupWatcher extends Daemon { @@ -549,7 +550,7 @@ class TabletGroupWatcher extends Daemon { } else { // Recreate the default tablet to hold the end of the table Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow()); - String tdir = master.getFileSystem().choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.getTableId() + String tdir = master.getFileSystem().choose(Optional.of(extent.getTableId().toString()), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION; MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir, master, timeType, this.master.masterLock); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java index 247645b..95c9f79 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java @@ -44,6 +44,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import com.google.common.base.Optional; + class TableInfo implements Serializable { private static final long serialVersionUID = 1L; @@ -175,7 +177,7 @@ class ChooseDir extends MasterRepo { @Override public Repo<Master> call(long tid, Master master) throws Exception { // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here - tableInfo.dir = master.getFileSystem().choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableInfo.tableId + tableInfo.dir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION; return new CreateDir(tableInfo); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java index 26a6928..979954b 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java @@ -72,6 +72,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import com.google.common.base.Optional; + /** * */ @@ -325,7 +327,7 @@ class PopulateMetadataTable extends MasterRepo { */ protected String getClonedTabletDir(Master master, String[] tableDirs, String tabletDir) { // We can try to spread out the tablet dirs across all volumes - String tableDir = master.getFileSystem().choose(tableDirs); + String tableDir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), tableDirs); // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX return tableDir + "/" + tableInfo.tableId + "/" + tabletDir; http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/master/src/test/java/org/apache/accumulo/master/tableOps/ImportTableTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/tableOps/ImportTableTest.java b/server/master/src/test/java/org/apache/accumulo/master/tableOps/ImportTableTest.java index 31f6bde..080e0af 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/tableOps/ImportTableTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/tableOps/ImportTableTest.java @@ -22,6 +22,8 @@ import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; +import com.google.common.base.Optional; + /** * */ @@ -41,7 +43,7 @@ public class ImportTableTest { EasyMock.expect(master.getFileSystem()).andReturn(volumeManager); // Choose the 2nd element - EasyMock.expect(volumeManager.choose(tableDirs)).andReturn(tableDirs[1]); + EasyMock.expect(volumeManager.choose(Optional.of(iti.tableId), tableDirs)).andReturn(tableDirs[1]); EasyMock.replay(master, volumeManager); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index b7d5b0e..6fd2624 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -43,6 +43,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Joiner; +import com.google.common.base.Optional; + import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -384,7 +386,7 @@ public class DfsLogger { log.debug("DfsLogger.open() begin"); VolumeManager fs = conf.getFileSystem(); - logPath = fs.choose(ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename; + logPath = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename; metaReference = toString(); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java index dad9a75..d35f07f 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.Test; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; public class TabletServerSyncCheckTest { @@ -226,7 +227,7 @@ public class TabletServerSyncCheckTest { } @Override - public String choose(String[] options) { + public String choose(Optional<String> tableID, String[] options) { return null; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f44f8c1/test/src/main/java/org/apache/accumulo/test/FairVolumeChooser.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/FairVolumeChooser.java b/test/src/main/java/org/apache/accumulo/test/FairVolumeChooser.java index 9eb0c84..2325086 100644 --- a/test/src/main/java/org/apache/accumulo/test/FairVolumeChooser.java +++ b/test/src/main/java/org/apache/accumulo/test/FairVolumeChooser.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test; import java.util.concurrent.ConcurrentHashMap; import org.apache.accumulo.server.fs.VolumeChooser; +import org.apache.accumulo.server.fs.VolumeChooserEnvironment; /** * Try to assign some fairness to choosing Volumes. Intended for tests, not for production @@ -28,7 +29,7 @@ public class FairVolumeChooser implements VolumeChooser { private final ConcurrentHashMap<Integer,Integer> optionLengthToLastChoice = new ConcurrentHashMap<Integer,Integer>(); @Override - public String choose(String[] options) { + public String choose(VolumeChooserEnvironment env, String[] options) { int currentChoice; Integer lastChoice = optionLengthToLastChoice.get(options.length); if (null == lastChoice) {
