Repository: accumulo Updated Branches: refs/heads/1.6.0-SNAPSHOT a8572fe57 -> 40299f89a
http://git-wip-us.apache.org/repos/asf/accumulo/blob/40299f89/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java index b2de7db..897b55b 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java @@ -99,7 +99,7 @@ public class MiniAccumuloConfigImpl { logDir = new File(dir, "logs"); walogDir = new File(dir, "walogs"); - // TODO ACCUMULO-XXXX replace usage of instance.dfs.{dir,uri} with instance.volumes + // TODO ACCUMULO-XXXX replace usage of instance.dfs.{dir,uri} with instance.volumes setInstanceLocation(); mergeProp(Property.INSTANCE_SECRET.getKey(), DEFAULT_INSTANCE_SECRET); @@ -110,7 +110,7 @@ public class MiniAccumuloConfigImpl { mergeProp(Property.TSERV_MAXMEM.getKey(), "50M"); mergeProp(Property.TSERV_WALOG_MAX_SIZE.getKey(), "100M"); mergeProp(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false"); - mergeProp(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey() + ".password", getRootPassword()); + mergeProp(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey() + "password", getRootPassword()); // since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5 mergeProp(Property.TSERV_MAJC_DELAY.getKey(), "3"); mergeProp(Property.GENERAL_CLASSPATHS.getKey(), libDir.getAbsolutePath() + "/[^.].*[.]jar"); @@ -389,7 +389,7 @@ public class MiniAccumuloConfigImpl { public File getClientConfFile() { return new File(getConfDir(), "client.conf"); } - + /** * sets system properties set for service processes * http://git-wip-us.apache.org/repos/asf/accumulo/blob/40299f89/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java index 31a8f0f..90bb0d4 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java @@ -77,7 +77,7 @@ abstract class Basic extends BasicServlet { at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8)); } else { Properties props = new Properties(); - int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1; + int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length(); for (Entry<String,String> entry : loginMap.entrySet()) { props.put(entry.getKey().substring(prefixLength), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/40299f89/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 30f1ae7..67f2739 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@ -30,10 +30,10 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; @@ -74,51 +74,51 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; public class TraceServer implements Watcher { - + final private static Logger log = Logger.getLogger(TraceServer.class); final private ServerConfiguration serverConfiguration; final private TServer server; final private AtomicReference<BatchWriter> writer; final private Connector connector; final String table; - + private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) { m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len)); } - + static class ByteArrayTransport extends TTransport { TByteArrayOutputStream out = new TByteArrayOutputStream(); - + @Override public boolean isOpen() { return true; } - + @Override public void open() throws TTransportException {} - + @Override public void close() {} - + @Override public int read(byte[] buf, int off, int len) { return 0; } - + @Override public void write(byte[] buf, int off, int len) throws TTransportException { out.write(buf, off, len); } - + public byte[] get() { return out.get(); } - + public int len() { return out.len(); } } - + class Receiver implements Iface { @Override public void span(RemoteSpan s) throws TException { @@ -143,8 +143,8 @@ public class TraceServer implements Watcher { } try { final BatchWriter writer = TraceServer.this.writer.get(); - /* Check for null, because we expect spans to come in much faster than flush calls. - In the case of failure, we'd rather avoid logging tons of NPEs. + /* + * Check for null, because we expect spans to come in much faster than flush calls. In the case of failure, we'd rather avoid logging tons of NPEs. */ if (null == writer) { log.warn("writer is not ready; discarding span."); @@ -159,15 +159,15 @@ public class TraceServer implements Watcher { if (log.isDebugEnabled()) { log.debug("discarded span due to rejection of mutation: " + spanMutation, exception); } - /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */ + /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */ } catch (RuntimeException exception) { log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception); log.debug("unable to write mutation to table due to exception.", exception); } } - + } - + public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception { this.serverConfiguration = serverConfiguration; AccumuloConfiguration conf = serverConfiguration.getConfiguration(); @@ -186,16 +186,16 @@ public class TraceServer implements Watcher { AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class) .newInstance(); - int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1; + int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length(); for (Entry<String,String> entry : loginMap.entrySet()) { props.put(entry.getKey().substring(prefixLength), entry.getValue()); } token.init(props); - + at = token; } - + connector = serverConfiguration.getInstance().getConnector(principal, at); if (!connector.tableOperations().exists(table)) { connector.tableOperations().create(table); @@ -213,7 +213,7 @@ public class TraceServer implements Watcher { this.connector = connector; // make sure we refer to the final variable from now on. connector = null; - + int port = conf.getPort(Property.TRACE_PORT); final ServerSocket sock = ServerSocketChannel.open().socket(); sock.setReuseAddress(true); @@ -225,7 +225,7 @@ public class TraceServer implements Watcher { registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort()); writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS))); } - + public void run() throws Exception { SimpleTimer.getInstance().schedule(new Runnable() { @Override @@ -235,7 +235,7 @@ public class TraceServer implements Watcher { }, 1000, 1000); server.serve(); } - + private void flush() { try { final BatchWriter writer = this.writer.get(); @@ -246,14 +246,14 @@ public class TraceServer implements Watcher { log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); log.debug("flushing traces failed due to exception", exception); resetWriter(); - /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */ + /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */ } catch (RuntimeException exception) { log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); log.debug("flushing traces failed due to exception", exception); resetWriter(); } } - + private void resetWriter() { BatchWriter writer = null; try { @@ -274,14 +274,14 @@ public class TraceServer implements Watcher { } } } - + private void registerInZooKeeper(String name) throws Exception { String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS; IZooReaderWriter zoo = ZooReaderWriter.getInstance(); String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(Constants.UTF8)); zoo.exists(path, this); } - + public static void main(String[] args) throws Exception { SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); ServerOpts opts = new ServerOpts(); @@ -296,7 +296,7 @@ public class TraceServer implements Watcher { server.run(); log.info("tracer stopping"); } - + @Override public void process(WatchedEvent event) { log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState()); @@ -318,5 +318,5 @@ public class TraceServer implements Watcher { server.stop(); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/40299f89/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java ---------------------------------------------------------------------- diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java index 79105ed..fa20725 100644 --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java @@ -59,9 +59,10 @@ import org.apache.log4j.Logger; * */ public class AccumuloVFSClassLoader { - + public static class AccumuloVFSClassLoaderShutdownThread implements Runnable { - + + @Override public void run() { try { AccumuloVFSClassLoader.close(); @@ -69,70 +70,70 @@ public class AccumuloVFSClassLoader { // do nothing, we are shutting down anyway } } - + } - + private static List<WeakReference<DefaultFileSystemManager>> vfsInstances = Collections .synchronizedList(new ArrayList<WeakReference<DefaultFileSystemManager>>()); - + public static final String DYNAMIC_CLASSPATH_PROPERTY_NAME = "general.dynamic.classpaths"; - - public static final String DEFAULT_DYNAMIC_CLASSPATH_VALUE = "$ACCUMULO_HOME/lib/ext/[^.].*.jar\n"; - + + public static final String DEFAULT_DYNAMIC_CLASSPATH_VALUE = "$ACCUMULO_HOME/lib/ext/[^.].*.jar"; + public static final String VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY = "general.vfs.classpaths"; - + public static final String VFS_CONTEXT_CLASSPATH_PROPERTY = "general.vfs.context.classpath."; - + public static final String VFS_CACHE_DIR = "general.vfs.cache.dir"; - + private static ClassLoader parent = null; private static volatile ReloadingClassLoader loader = null; private static final Object lock = new Object(); - + private static ContextManager contextManager; - + private static Logger log = Logger.getLogger(AccumuloVFSClassLoader.class); - + static { // Register the shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(new AccumuloVFSClassLoaderShutdownThread())); } - + public synchronized static <U> Class<? extends U> loadClass(String classname, Class<U> extension) throws ClassNotFoundException { try { - return (Class<? extends U>) getClassLoader().loadClass(classname).asSubclass(extension); + return getClassLoader().loadClass(classname).asSubclass(extension); } catch (IOException e) { throw new ClassNotFoundException("IO Error loading class " + classname, e); } } - + public static Class<?> loadClass(String classname) throws ClassNotFoundException { return loadClass(classname, Object.class).asSubclass(Object.class); } - + static FileObject[] resolve(FileSystemManager vfs, String uris) throws FileSystemException { return resolve(vfs, uris, new ArrayList<FileObject>()); } - + static FileObject[] resolve(FileSystemManager vfs, String uris, ArrayList<FileObject> pathsToMonitor) throws FileSystemException { if (uris == null) return new FileObject[0]; - + ArrayList<FileObject> classpath = new ArrayList<FileObject>(); - + pathsToMonitor.clear(); - + for (String path : uris.split(",")) { - + path = path.trim(); - + if (path.equals("")) continue; - + path = AccumuloClassLoader.replaceEnvVars(path, System.getenv()); - + FileObject fo = vfs.resolveFile(path); - + switch (fo.getType()) { case FILE: case FOLDER: @@ -158,67 +159,67 @@ public class AccumuloVFSClassLoader { log.warn("ignoring classpath entry " + fo); break; } - + } - + return classpath.toArray(new FileObject[classpath.size()]); } - + private static ReloadingClassLoader createDynamicClassloader(final ClassLoader parent) throws FileSystemException, IOException { String dynamicCPath = AccumuloClassLoader.getAccumuloString(DYNAMIC_CLASSPATH_PROPERTY_NAME, DEFAULT_DYNAMIC_CLASSPATH_VALUE); - + String envJars = System.getenv("ACCUMULO_XTRAJARS"); if (null != envJars && !envJars.equals("")) if (dynamicCPath != null && !dynamicCPath.equals("")) dynamicCPath = dynamicCPath + "," + envJars; else dynamicCPath = envJars; - + ReloadingClassLoader wrapper = new ReloadingClassLoader() { @Override public ClassLoader getClassLoader() { return parent; } }; - + if (dynamicCPath == null || dynamicCPath.equals("")) return wrapper; - + // TODO monitor time for lib/ext was 1 sec... should this be configurable? - ACCUMULO-1301 return new AccumuloReloadingVFSClassLoader(dynamicCPath, generateVfs(), wrapper, 1000, true); } - + public static ClassLoader getClassLoader() throws IOException { ReloadingClassLoader localLoader = loader; while (null == localLoader) { synchronized (lock) { if (null == loader) { - + FileSystemManager vfs = generateVfs(); - + // Set up the 2nd tier class loader if (null == parent) { parent = AccumuloClassLoader.getClassLoader(); } - + FileObject[] vfsCP = resolve(vfs, AccumuloClassLoader.getAccumuloString(VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "")); - + if (vfsCP.length == 0) { localLoader = createDynamicClassloader(parent); loader = localLoader; return localLoader.getClassLoader(); } - + // Create the Accumulo Context ClassLoader using the DEFAULT_CONTEXT localLoader = createDynamicClassloader(new VFSClassLoader(vfsCP, vfs, parent)); loader = localLoader; } } } - + return localLoader.getClassLoader(); } - + public static FileSystemManager generateVfs() throws FileSystemException { DefaultFileSystemManager vfs = new FinalCloseDefaultFileSystemManager(); vfs.addProvider("res", new org.apache.commons.vfs2.provider.res.ResourceFileProvider()); @@ -265,11 +266,11 @@ public class AccumuloVFSClassLoader { vfsInstances.add(new WeakReference<DefaultFileSystemManager>(vfs)); return vfs; } - + public interface Printer { void print(String s); } - + public static void printClassPath() { printClassPath(new Printer() { @Override @@ -278,28 +279,28 @@ public class AccumuloVFSClassLoader { } }); } - + public static void printClassPath(Printer out) { try { ClassLoader cl = getClassLoader(); ArrayList<ClassLoader> classloaders = new ArrayList<ClassLoader>(); - + while (cl != null) { classloaders.add(cl); cl = cl.getParent(); } - + Collections.reverse(classloaders); - + int level = 0; - + for (ClassLoader classLoader : classloaders) { if (level > 0) out.print(""); level++; - + String classLoaderDescription; - + switch (level) { case 1: classLoaderDescription = level + ": Java System Classloader (loads Java system resources)"; @@ -318,16 +319,16 @@ public class AccumuloVFSClassLoader { + AccumuloVFSClassLoader.class.getName() + ")"; break; } - + if (classLoader instanceof URLClassLoader) { // If VFS class loader enabled, but no contexts defined. URLClassLoader ucl = (URLClassLoader) classLoader; out.print("Level " + classLoaderDescription + " URL classpath items are:"); - + for (URL u : ucl.getURLs()) { out.print("\t" + u.toExternalForm()); } - + } else if (classLoader instanceof VFSClassLoader) { out.print("Level " + classLoaderDescription + " VFS classpaths items are:"); VFSClassLoader vcl = (VFSClassLoader) classLoader; @@ -338,12 +339,12 @@ public class AccumuloVFSClassLoader { out.print("Unknown classloader configuration " + classLoader.getClass()); } } - + } catch (Throwable t) { throw new RuntimeException(t); } } - + public static synchronized ContextManager getContextManager() throws IOException { if (contextManager == null) { getClassLoader(); @@ -358,7 +359,7 @@ public class AccumuloVFSClassLoader { } }); } - + return contextManager; }