http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java new file mode 100644 index 0000000..f14aedd --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -0,0 +1,736 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import org.apache.commons.io.FileUtils; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.StringUtils; +import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.ObjectReleaseTracker; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Source; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * + * All Solr ZooKeeper interactions should go through this class rather than + * ZooKeeper. This class handles synchronous connects and reconnections. + * + */ +public class SolrZkClient implements Closeable { + + static final String NEWL = System.getProperty("line.separator"); + + static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 30000; + + private static final Logger log = LoggerFactory + .getLogger(SolrZkClient.class); + + private ConnectionManager connManager; + + private volatile SolrZooKeeper keeper; + + private ZkCmdExecutor zkCmdExecutor; + + private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback")); + + private volatile boolean isClosed = false; + private ZkClientConnectionStrategy zkClientConnectionStrategy; + private int zkClientTimeout; + private ZkACLProvider zkACLProvider; + private String zkServerAddress; + + public int getZkClientTimeout() { + return zkClientTimeout; + } + + // expert: for tests + public SolrZkClient() { + + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout) { + this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null); + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) { + this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), null); + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) { + this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), onReonnect); + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout, + ZkClientConnectionStrategy strat, final OnReconnect onReconnect) { + this(zkServerAddress, zkClientTimeout, DEFAULT_CLIENT_CONNECT_TIMEOUT, strat, onReconnect); + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout, + ZkClientConnectionStrategy strat, final OnReconnect onReconnect) { + this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null); + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout, + ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) { + this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null); + } + + public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout, + ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider) { + this.zkClientConnectionStrategy = strat; + this.zkServerAddress = zkServerAddress; + + if (strat == null) { + strat = new DefaultConnectionStrategy(); + } + + if (!strat.hasZkCredentialsToAddAutomatically()) { + ZkCredentialsProvider zkCredentialsToAddAutomatically = createZkCredentialsToAddAutomatically(); + strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically); + } + + this.zkClientTimeout = zkClientTimeout; + // we must retry at least as long as the session timeout + zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout); + connManager = new ConnectionManager("ZooKeeperConnection Watcher:" + + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect); + + try { + strat.connect(zkServerAddress, zkClientTimeout, wrapWatcher(connManager), + new ZkUpdate() { + @Override + public void update(SolrZooKeeper zooKeeper) { + SolrZooKeeper oldKeeper = keeper; + keeper = zooKeeper; + try { + closeKeeper(oldKeeper); + } finally { + if (isClosed) { + // we may have been closed + closeKeeper(SolrZkClient.this.keeper); + } + } + } + }); + } catch (Exception e) { + connManager.close(); + if (keeper != null) { + try { + keeper.close(); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + + try { + connManager.waitForConnected(clientConnectTimeout); + } catch (Exception e) { + connManager.close(); + try { + keeper.close(); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + assert ObjectReleaseTracker.track(this); + if (zkACLProvider == null) { + this.zkACLProvider = createZkACLProvider(); + } else { + this.zkACLProvider = zkACLProvider; + } + } + + public ConnectionManager getConnectionManager() { + return connManager; + } + + public ZkClientConnectionStrategy getZkClientConnectionStrategy() { + return zkClientConnectionStrategy; + } + + public static final String ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME = "zkCredentialsProvider"; + protected ZkCredentialsProvider createZkCredentialsToAddAutomatically() { + String zkCredentialsProviderClassName = System.getProperty(ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME); + if (!StringUtils.isEmpty(zkCredentialsProviderClassName)) { + try { + log.info("Using ZkCredentialsProvider: " + zkCredentialsProviderClassName); + return (ZkCredentialsProvider)Class.forName(zkCredentialsProviderClassName).getConstructor().newInstance(); + } catch (Throwable t) { + // just ignore - go default + log.warn("VM param zkCredentialsProvider does not point to a class implementing ZkCredentialsProvider and with a non-arg constructor", t); + } + } + log.info("Using default ZkCredentialsProvider"); + return new DefaultZkCredentialsProvider(); + } + + public static final String ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME = "zkACLProvider"; + protected ZkACLProvider createZkACLProvider() { + String zkACLProviderClassName = System.getProperty(ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME); + if (!StringUtils.isEmpty(zkACLProviderClassName)) { + try { + log.info("Using ZkACLProvider: " + zkACLProviderClassName); + return (ZkACLProvider)Class.forName(zkACLProviderClassName).getConstructor().newInstance(); + } catch (Throwable t) { + // just ignore - go default + log.warn("VM param zkACLProvider does not point to a class implementing ZkACLProvider and with a non-arg constructor", t); + } + } + log.info("Using default ZkACLProvider"); + return new DefaultZkACLProvider(); + } + + /** + * Returns true if client is connected + */ + public boolean isConnected() { + return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED; + } + + public void delete(final String path, final int version, boolean retryOnConnLoss) + throws InterruptedException, KeeperException { + if (retryOnConnLoss) { + zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public Stat execute() throws KeeperException, InterruptedException { + keeper.delete(path, version); + return null; + } + }); + } else { + keeper.delete(path, version); + } + } + + private Watcher wrapWatcher (final Watcher watcher) { + if (watcher == null) return watcher; + + // wrap the watcher so that it doesn't fire off ZK's event queue + return new Watcher() { + @Override + public void process(final WatchedEvent event) { + log.debug("Submitting job to respond to event " + event); + zkCallbackExecutor.submit(new Runnable () { + @Override + public void run () { + watcher.process(event); + } + }); + } + }; + } + + /** + * Return the stat of the node of the given path. Return null if no such a + * node exists. + * <p> + * If the watch is non-null and the call is successful (no exception is thrown), + * a watch will be left on the node with the given path. The watch will be + * triggered by a successful operation that creates/delete the node or sets + * the data on the node. + * + * @param path the node path + * @param watcher explicit watcher + * @return the stat of the node of the given path; return null if no such a + * node exists. + * @throws KeeperException If the server signals an error + * @throws InterruptedException If the server transaction is interrupted. + * @throws IllegalArgumentException if an invalid path is specified + */ + public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + if (retryOnConnLoss) { + return zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public Stat execute() throws KeeperException, InterruptedException { + return keeper.exists(path, wrapWatcher(watcher)); + } + }); + } else { + return keeper.exists(path, wrapWatcher(watcher)); + } + } + + /** + * Returns true if path exists + */ + public Boolean exists(final String path, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + if (retryOnConnLoss) { + return zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public Boolean execute() throws KeeperException, InterruptedException { + return keeper.exists(path, null) != null; + } + }); + } else { + return keeper.exists(path, null) != null; + } + } + + /** + * Returns children of the node at the path + */ + public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + if (retryOnConnLoss) { + return zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public List<String> execute() throws KeeperException, InterruptedException { + return keeper.getChildren(path, wrapWatcher(watcher)); + } + }); + } else { + return keeper.getChildren(path, wrapWatcher(watcher)); + } + } + + /** + * Returns node's data + */ + public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + if (retryOnConnLoss) { + return zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public byte[] execute() throws KeeperException, InterruptedException { + return keeper.getData(path, wrapWatcher(watcher), stat); + } + }); + } else { + return keeper.getData(path, wrapWatcher(watcher), stat); + } + } + + /** + * Returns node's state + */ + public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + if (retryOnConnLoss) { + return zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public Stat execute() throws KeeperException, InterruptedException { + return keeper.setData(path, data, version); + } + }); + } else { + return keeper.setData(path, data, version); + } + } + + /** + * Returns path of created node + */ + public String create(final String path, final byte[] data, + final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, + InterruptedException { + if (retryOnConnLoss) { + return zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public String execute() throws KeeperException, InterruptedException { + return keeper.create(path, data, zkACLProvider.getACLsToAdd(path), + createMode); + } + }); + } else { + List<ACL> acls = zkACLProvider.getACLsToAdd(path); + return keeper.create(path, data, acls, createMode); + } + } + + /** + * Creates the path in ZooKeeper, creating each node as necessary. + * + * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr, + * group, node exist, each will be created. + */ + public void makePath(String path, boolean retryOnConnLoss) throws KeeperException, + InterruptedException { + makePath(path, null, CreateMode.PERSISTENT, retryOnConnLoss); + } + + public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, + InterruptedException { + makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss); + } + + public void makePath(String path, File file, boolean failOnExists, boolean retryOnConnLoss) + throws IOException, KeeperException, InterruptedException { + makePath(path, FileUtils.readFileToByteArray(file), + CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss); + } + + public void makePath(String path, File file, boolean retryOnConnLoss) throws IOException, + KeeperException, InterruptedException { + makePath(path, FileUtils.readFileToByteArray(file), retryOnConnLoss); + } + + public void makePath(String path, CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, + InterruptedException { + makePath(path, null, createMode, retryOnConnLoss); + } + + /** + * Creates the path in ZooKeeper, creating each node as necessary. + * + * @param data to set on the last zkNode + */ + public void makePath(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException, + InterruptedException { + makePath(path, data, CreateMode.PERSISTENT, retryOnConnLoss); + } + + /** + * Creates the path in ZooKeeper, creating each node as necessary. + * + * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr, + * group, node exist, each will be created. + * + * @param data to set on the last zkNode + */ + public void makePath(String path, byte[] data, CreateMode createMode, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + makePath(path, data, createMode, null, retryOnConnLoss); + } + + /** + * Creates the path in ZooKeeper, creating each node as necessary. + * + * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr, + * group, node exist, each will be created. + * + * @param data to set on the last zkNode + */ + public void makePath(String path, byte[] data, CreateMode createMode, + Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException { + makePath(path, data, createMode, watcher, true, retryOnConnLoss); + } + + + + /** + * Creates the path in ZooKeeper, creating each node as necessary. + * + * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr, + * group, node exist, each will be created. + * + * Note: retryOnConnLoss is only respected for the final node - nodes + * before that are always retried on connection loss. + */ + public void makePath(String path, byte[] data, CreateMode createMode, + Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException { + if (log.isInfoEnabled()) { + log.info("makePath: " + path); + } + boolean retry = true; + + if (path.startsWith("/")) { + path = path.substring(1, path.length()); + } + String[] paths = path.split("/"); + StringBuilder sbPath = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + byte[] bytes = null; + String pathPiece = paths[i]; + sbPath.append("/" + pathPiece); + final String currentPath = sbPath.toString(); + Object exists = exists(currentPath, watcher, retryOnConnLoss); + if (exists == null || ((i == paths.length -1) && failOnExists)) { + CreateMode mode = CreateMode.PERSISTENT; + if (i == paths.length - 1) { + mode = createMode; + bytes = data; + if (!retryOnConnLoss) retry = false; + } + try { + if (retry) { + final CreateMode finalMode = mode; + final byte[] finalBytes = bytes; + zkCmdExecutor.retryOperation(new ZkOperation() { + @Override + public Object execute() throws KeeperException, InterruptedException { + keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode); + return null; + } + }); + } else { + keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode); + } + } catch (NodeExistsException e) { + + if (!failOnExists) { + // TODO: version ? for now, don't worry about race + setData(currentPath, data, -1, retryOnConnLoss); + // set new watch + exists(currentPath, watcher, retryOnConnLoss); + return; + } + + // ignore unless it's the last node in the path + if (i == paths.length - 1) { + throw e; + } + } + if(i == paths.length -1) { + // set new watch + exists(currentPath, watcher, retryOnConnLoss); + } + } else if (i == paths.length - 1) { + // TODO: version ? for now, don't worry about race + setData(currentPath, data, -1, retryOnConnLoss); + // set new watch + exists(currentPath, watcher, retryOnConnLoss); + } + } + } + + public void makePath(String zkPath, CreateMode createMode, Watcher watcher, boolean retryOnConnLoss) + throws KeeperException, InterruptedException { + makePath(zkPath, null, createMode, watcher, retryOnConnLoss); + } + + /** + * Write data to ZooKeeper. + */ + public Stat setData(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException, + InterruptedException { + return setData(path, data, -1, retryOnConnLoss); + } + + /** + * Write file to ZooKeeper - default system encoding used. + * + * @param path path to upload file to e.g. /solr/conf/solrconfig.xml + * @param file path to file to be uploaded + */ + public Stat setData(String path, File file, boolean retryOnConnLoss) throws IOException, + KeeperException, InterruptedException { + if (log.isInfoEnabled()) { + log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path); + } + + byte[] data = FileUtils.readFileToByteArray(file); + return setData(path, data, retryOnConnLoss); + } + + /** + * Fills string with printout of current ZooKeeper layout. + */ + public void printLayout(String path, int indent, StringBuilder string) + throws KeeperException, InterruptedException { + byte[] data = getData(path, null, null, true); + List<String> children = getChildren(path, null, true); + StringBuilder dent = new StringBuilder(); + for (int i = 0; i < indent; i++) { + dent.append(" "); + } + string.append(dent + path + " (" + children.size() + ")" + NEWL); + if (data != null) { + String dataString = new String(data, StandardCharsets.UTF_8); + if ((!path.endsWith(".txt") && !path.endsWith(".xml")) || path.endsWith(ZkStateReader.CLUSTER_STATE)) { + if (path.endsWith(".xml")) { + // this is the cluster state in xml format - lets pretty print + dataString = prettyPrint(dataString); + } + + string.append(dent + "DATA:\n" + dent + " " + + dataString.replaceAll("\n", "\n" + dent + " ") + NEWL); + } else { + string.append(dent + "DATA: ...supressed..." + NEWL); + } + } + + for (String child : children) { + if (!child.equals("quota")) { + try { + printLayout(path + (path.equals("/") ? "" : "/") + child, indent + 1, + string); + } catch (NoNodeException e) { + // must have gone away + } + } + } + + } + + /** + * Prints current ZooKeeper layout to stdout. + */ + public void printLayoutToStdOut() throws KeeperException, + InterruptedException { + StringBuilder sb = new StringBuilder(); + printLayout("/", 0, sb); + System.out.println(sb.toString()); + } + + public static String prettyPrint(String input, int indent) { + try { + Source xmlInput = new StreamSource(new StringReader(input)); + StringWriter stringWriter = new StringWriter(); + StreamResult xmlOutput = new StreamResult(stringWriter); + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + transformerFactory.setAttribute("indent-number", indent); + Transformer transformer = transformerFactory.newTransformer(); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.transform(xmlInput, xmlOutput); + return xmlOutput.getWriter().toString(); + } catch (Exception e) { + throw new RuntimeException("Problem pretty printing XML", e); + } + } + + private static String prettyPrint(String input) { + return prettyPrint(input, 2); + } + + public void close() { + if (isClosed) return; // it's okay if we over close - same as solrcore + isClosed = true; + try { + closeKeeper(keeper); + } finally { + connManager.close(); + closeCallbackExecutor(); + } + assert ObjectReleaseTracker.release(this); + } + + public boolean isClosed() { + return isClosed; + } + + /** + * Allows package private classes to update volatile ZooKeeper. + */ + void updateKeeper(SolrZooKeeper keeper) throws InterruptedException { + SolrZooKeeper oldKeeper = this.keeper; + this.keeper = keeper; + if (oldKeeper != null) { + oldKeeper.close(); + } + // we might have been closed already + if (isClosed) this.keeper.close(); + } + + public SolrZooKeeper getSolrZooKeeper() { + return keeper; + } + + private void closeKeeper(SolrZooKeeper keeper) { + if (keeper != null) { + try { + keeper.close(); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.error("", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", + e); + } + } + } + + private void closeCallbackExecutor() { + try { + ExecutorUtil.shutdownAndAwaitTermination(zkCallbackExecutor); + } catch (Exception e) { + SolrException.log(log, e); + } + } + + // yeah, it's recursive :( + public void clean(String path) throws InterruptedException, KeeperException { + List<String> children; + try { + children = getChildren(path, null, true); + } catch (NoNodeException r) { + return; + } + for (String string : children) { + // we can't clean the built-in zookeeper node + if (path.equals("/") && string.equals("zookeeper")) continue; + if (path.equals("/")) { + clean(path + string); + } else { + clean(path + "/" + string); + } + } + try { + if (!path.equals("/")) { + try { + delete(path, -1, true); + } catch (NotEmptyException e) { + clean(path); + } + } + } catch (NoNodeException r) { + return; + } + } + + /** + * Validates if zkHost contains a chroot. See http://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions + */ + public static boolean containsChroot(String zkHost) { + return zkHost.contains("/"); + } + + /** + * Check to see if a Throwable is an InterruptedException, and if it is, set the thread interrupt flag + * @param e the Throwable + * @return the Throwable + */ + public static Throwable checkInterrupted(Throwable e) { + if (e instanceof InterruptedException) + Thread.interrupted(); + return e; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java new file mode 100644 index 0000000..35ad8bf --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java @@ -0,0 +1,103 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.SocketAddress; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.zookeeper.ClientCnxn; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +// we use this class to expose nasty stuff for tests +public class SolrZooKeeper extends ZooKeeper { + final Set<Thread> spawnedThreads = new CopyOnWriteArraySet<>(); + + // for test debug + //static Map<SolrZooKeeper,Exception> clients = new ConcurrentHashMap<SolrZooKeeper,Exception>(); + + public SolrZooKeeper(String connectString, int sessionTimeout, + Watcher watcher) throws IOException { + super(connectString, sessionTimeout, watcher); + //clients.put(this, new RuntimeException()); + } + + public ClientCnxn getConnection() { + return cnxn; + } + + public SocketAddress getSocketAddress() { + return testableLocalSocketAddress(); + } + + public void closeCnxn() { + final Thread t = new Thread() { + @Override + public void run() { + try { + final ClientCnxn cnxn = getConnection(); + synchronized (cnxn) { + try { + final Field sendThreadFld = cnxn.getClass().getDeclaredField("sendThread"); + sendThreadFld.setAccessible(true); + Object sendThread = sendThreadFld.get(cnxn); + if (sendThread != null) { + Method method = sendThread.getClass().getDeclaredMethod("testableCloseSocket"); + method.setAccessible(true); + try { + method.invoke(sendThread); + } catch (InvocationTargetException e) { + // is fine + } + } + } catch (Exception e) { + throw new RuntimeException("Closing Zookeeper send channel failed.", e); + } + } + } finally { + spawnedThreads.remove(this); + } + } + }; + spawnedThreads.add(t); + t.start(); + } + + @Override + public synchronized void close() throws InterruptedException { + for (Thread t : spawnedThreads) { + if (t.isAlive()) t.interrupt(); + } + super.close(); + } + +// public static void assertCloses() { +// if (clients.size() > 0) { +// Iterator<Exception> stacktraces = clients.values().iterator(); +// Exception cause = null; +// cause = stacktraces.next(); +// throw new RuntimeException("Found a bad one!", cause); +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java new file mode 100644 index 0000000..0b9ae1d --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java @@ -0,0 +1,89 @@ +package org.apache.solr.common.cloud; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.solr.common.StringUtils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class VMParamsAllAndReadonlyDigestZkACLProvider extends DefaultZkACLProvider { + + public static final String DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME = "zkDigestReadonlyUsername"; + public static final String DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME = "zkDigestReadonlyPassword"; + + final String zkDigestAllUsernameVMParamName; + final String zkDigestAllPasswordVMParamName; + final String zkDigestReadonlyUsernameVMParamName; + final String zkDigestReadonlyPasswordVMParamName; + + public VMParamsAllAndReadonlyDigestZkACLProvider() { + this( + VMParamsSingleSetCredentialsDigestZkCredentialsProvider.DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME, + VMParamsSingleSetCredentialsDigestZkCredentialsProvider.DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME, + DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME, + DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME + ); + } + + public VMParamsAllAndReadonlyDigestZkACLProvider(String zkDigestAllUsernameVMParamName, String zkDigestAllPasswordVMParamName, + String zkDigestReadonlyUsernameVMParamName, String zkDigestReadonlyPasswordVMParamName) { + this.zkDigestAllUsernameVMParamName = zkDigestAllUsernameVMParamName; + this.zkDigestAllPasswordVMParamName = zkDigestAllPasswordVMParamName; + this.zkDigestReadonlyUsernameVMParamName = zkDigestReadonlyUsernameVMParamName; + this.zkDigestReadonlyPasswordVMParamName = zkDigestReadonlyPasswordVMParamName; + } + + + @Override + protected List<ACL> createGlobalACLsToAdd() { + try { + List<ACL> result = new ArrayList<ACL>(); + + // Not to have to provide too much credentials and ACL information to the process it is assumed that you want "ALL"-acls + // added to the user you are using to connect to ZK (if you are using VMParamsSingleSetCredentialsDigestZkCredentialsProvider) + String digestAllUsername = System.getProperty(zkDigestAllUsernameVMParamName); + String digestAllPassword = System.getProperty(zkDigestAllPasswordVMParamName); + if (!StringUtils.isEmpty(digestAllUsername) && !StringUtils.isEmpty(digestAllPassword)) { + result.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(digestAllUsername + ":" + digestAllPassword)))); + } + + // Besides that support for adding additional "READONLY"-acls for another user + String digestReadonlyUsername = System.getProperty(zkDigestReadonlyUsernameVMParamName); + String digestReadonlyPassword = System.getProperty(zkDigestReadonlyPasswordVMParamName); + if (!StringUtils.isEmpty(digestReadonlyUsername) && !StringUtils.isEmpty(digestReadonlyPassword)) { + result.add(new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(digestReadonlyUsername + ":" + digestReadonlyPassword)))); + } + + if (result.isEmpty()) { + result = super.createGlobalACLsToAdd(); + } + + return result; + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java new file mode 100644 index 0000000..1e575fd --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java @@ -0,0 +1,60 @@ +package org.apache.solr.common.cloud; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.solr.common.StringUtils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class VMParamsSingleSetCredentialsDigestZkCredentialsProvider extends DefaultZkCredentialsProvider { + + public static final String DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME = "zkDigestUsername"; + public static final String DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME = "zkDigestPassword"; + + final String zkDigestUsernameVMParamName; + final String zkDigestPasswordVMParamName; + + public VMParamsSingleSetCredentialsDigestZkCredentialsProvider() { + this(DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME, DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME); + } + + public VMParamsSingleSetCredentialsDigestZkCredentialsProvider(String zkDigestUsernameVMParamName, String zkDigestPasswordVMParamName) { + this.zkDigestUsernameVMParamName = zkDigestUsernameVMParamName; + this.zkDigestPasswordVMParamName = zkDigestPasswordVMParamName; + } + + @Override + protected Collection<ZkCredentials> createCredentials() { + List<ZkCredentials> result = new ArrayList<ZkCredentials>(); + String digestUsername = System.getProperty(zkDigestUsernameVMParamName); + String digestPassword = System.getProperty(zkDigestPasswordVMParamName); + if (!StringUtils.isEmpty(digestUsername) && !StringUtils.isEmpty(digestPassword)) { + try { + result.add(new ZkCredentials("digest", (digestUsername + ":" + digestPassword).getBytes("UTF-8"))); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + return result; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java new file mode 100644 index 0000000..03149b3 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java @@ -0,0 +1,28 @@ +package org.apache.solr.common.cloud; + +import java.util.List; + +import org.apache.zookeeper.data.ACL; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface ZkACLProvider { + + List<ACL> getACLsToAdd(String zNodePath); + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java new file mode 100644 index 0000000..5f4baa5 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java @@ -0,0 +1,113 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkCredentialsProvider.ZkCredentials; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public abstract class ZkClientConnectionStrategy { + private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class); + + private volatile ZkCredentialsProvider zkCredentialsToAddAutomatically; + private volatile boolean zkCredentialsToAddAutomaticallyUsed; + + private List<DisconnectedListener> disconnectedListeners = new ArrayList<>(); + private List<ConnectedListener> connectedListeners = new ArrayList<>(); + + public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException; + public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException; + + public ZkClientConnectionStrategy() { + zkCredentialsToAddAutomaticallyUsed = false; + } + + public synchronized void disconnected() { + for (DisconnectedListener listener : disconnectedListeners) { + try { + listener.disconnected(); + } catch (Exception e) { + SolrException.log(log, "", e); + } + } + } + + public synchronized void connected() { + for (ConnectedListener listener : connectedListeners) { + try { + listener.connected(); + } catch (Exception e) { + SolrException.log(log, "", e); + } + } + } + + public interface DisconnectedListener { + public void disconnected(); + }; + + public interface ConnectedListener { + public void connected(); + }; + + + public synchronized void addDisconnectedListener(DisconnectedListener listener) { + disconnectedListeners.add(listener); + } + + public synchronized void addConnectedListener(ConnectedListener listener) { + connectedListeners.add(listener); + } + + public static abstract class ZkUpdate { + public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException; + } + + public void setZkCredentialsToAddAutomatically(ZkCredentialsProvider zkCredentialsToAddAutomatically) { + if (zkCredentialsToAddAutomaticallyUsed || (zkCredentialsToAddAutomatically == null)) + throw new RuntimeException("Cannot change zkCredentialsToAddAutomatically after it has been (connect or reconnect was called) used or to null"); + this.zkCredentialsToAddAutomatically = zkCredentialsToAddAutomatically; + } + + public boolean hasZkCredentialsToAddAutomatically() { + return zkCredentialsToAddAutomatically != null; + } + + protected SolrZooKeeper createSolrZooKeeper(final String serverAddress, final int zkClientTimeout, + final Watcher watcher) throws IOException { + SolrZooKeeper result = new SolrZooKeeper(serverAddress, zkClientTimeout, watcher); + + zkCredentialsToAddAutomaticallyUsed = true; + for (ZkCredentials zkCredentials : zkCredentialsToAddAutomatically.getCredentials()) { + result.addAuthInfo(zkCredentials.getScheme(), zkCredentials.getAuth()); + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java new file mode 100644 index 0000000..d77ad06 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java @@ -0,0 +1,111 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; + + +public class ZkCmdExecutor { + private long retryDelay = 1500L; // 1 second would match timeout, so 500 ms over for padding + private int retryCount; + private double timeouts; + + /** + * TODO: At this point, this should probably take a SolrZkClient in + * its constructor. + * + * @param timeoutms + * the client timeout for the ZooKeeper clients that will be used + * with this class. + */ + public ZkCmdExecutor(int timeoutms) { + timeouts = timeoutms / 1000.0; + this.retryCount = Math.round(0.5f * ((float)Math.sqrt(8.0f * timeouts + 1.0f) - 1.0f)) + 1; + } + + public long getRetryDelay() { + return retryDelay; + } + + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + + /** + * Perform the given operation, retrying if the connection fails + */ + @SuppressWarnings("unchecked") + public <T> T retryOperation(ZkOperation operation) + throws KeeperException, InterruptedException { + KeeperException exception = null; + for (int i = 0; i < retryCount; i++) { + try { + return (T) operation.execute(); + } catch (KeeperException.ConnectionLossException e) { + if (exception == null) { + exception = e; + } + if (Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + if (Thread.currentThread() instanceof ClosableThread) { + if (((ClosableThread) Thread.currentThread()).isClosed()) { + throw exception; + } + } + if (i != retryCount -1) { + retryDelay(i); + } + } + } + throw exception; + } + + public void ensureExists(String path, final SolrZkClient zkClient) throws KeeperException, InterruptedException { + ensureExists(path, null, CreateMode.PERSISTENT, zkClient); + } + + public void ensureExists(final String path, final byte[] data, + CreateMode createMode, final SolrZkClient zkClient) throws KeeperException, InterruptedException { + + if (zkClient.exists(path, true)) { + return; + } + try { + zkClient.makePath(path, data, true); + } catch (NodeExistsException e) { + // it's okay if another beats us creating the node + } + + } + + /** + * Performs a retry delay if this is not the first attempt + * + * @param attemptCount + * the number of the attempts performed so far + */ + protected void retryDelay(int attemptCount) throws InterruptedException { + Thread.sleep((attemptCount + 1) * retryDelay); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java new file mode 100644 index 0000000..a3a8060 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collections; +import java.util.List; + +/** + * Class that manages named configs in Zookeeper + */ +public class ZkConfigManager { + + private static final Logger logger = LoggerFactory.getLogger(ZkConfigManager.class); + + /** ZkNode where named configs are stored */ + public static final String CONFIGS_ZKNODE = "/configs"; + + private final SolrZkClient zkClient; + + /** + * Creates a new ZkConfigManager + * @param zkClient the {@link SolrZkClient} to use + */ + public ZkConfigManager(SolrZkClient zkClient) { + this.zkClient = zkClient; + } + + private void uploadToZK(final Path rootPath, final String zkPath) throws IOException { + + if (!Files.exists(rootPath)) + throw new IOException("Path " + rootPath + " does not exist"); + + Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>(){ + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + String filename = file.getFileName().toString(); + if (filename.startsWith(".")) + return FileVisitResult.CONTINUE; + String zkNode = createZkNodeName(zkPath, rootPath, file); + try { + zkClient.makePath(zkNode, file.toFile(), false, true); + } catch (KeeperException | InterruptedException e) { + throw new IOException("Error uploading file " + file.toString() + " to zookeeper path " + zkNode, + SolrZkClient.checkInterrupted(e)); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + return (dir.getFileName().toString().startsWith(".")) ? FileVisitResult.SKIP_SUBTREE : FileVisitResult.CONTINUE; + } + }); + } + + private static String createZkNodeName(String zkRoot, Path root, Path file) { + String relativePath = root.relativize(file).toString(); + // Windows shenanigans + String separator = root.getFileSystem().getSeparator(); + if ("\\".equals(separator)) + relativePath = relativePath.replaceAll("\\\\", "/"); + return zkRoot + "/" + relativePath; + } + + private void downloadFromZK(String zkPath, Path dir) throws IOException { + try { + List<String> files = zkClient.getChildren(zkPath, null, true); + Files.createDirectories(dir); + for (String file : files) { + List<String> children = zkClient.getChildren(zkPath + "/" + file, null, true); + if (children.size() == 0) { + byte[] data = zkClient.getData(zkPath + "/" + file, null, null, true); + Path filename = dir.resolve(file); + logger.info("Writing file {}", filename); + Files.write(filename, data); + } else { + downloadFromZK(zkPath + "/" + file, dir.resolve(file)); + } + } + } + catch (KeeperException | InterruptedException e) { + throw new IOException("Error downloading files from zookeeper path " + zkPath + " to " + dir.toString(), + SolrZkClient.checkInterrupted(e)); + } + } + + /** + * Upload files from a given path to a config in Zookeeper + * @param dir {@link java.nio.file.Path} to the files + * @param configName the name to give the config + * @throws IOException + * if an I/O error occurs or the path does not exist + */ + public void uploadConfigDir(Path dir, String configName) throws IOException { + uploadToZK(dir, CONFIGS_ZKNODE + "/" + configName); + } + + /** + * Download a config from Zookeeper and write it to the filesystem + * @param configName the config to download + * @param dir the {@link Path} to write files under + * @throws IOException + * if an I/O error occurs or the config does not exist + */ + public void downloadConfigDir(String configName, Path dir) throws IOException { + downloadFromZK(CONFIGS_ZKNODE + "/" + configName, dir); + } + + public List<String> listConfigs() throws IOException { + try { + return zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true); + } + catch (KeeperException.NoNodeException e) { + return Collections.emptyList(); + } + catch (KeeperException | InterruptedException e) { + throw new IOException("Error listing configs", SolrZkClient.checkInterrupted(e)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java new file mode 100644 index 0000000..131d330 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java @@ -0,0 +1,74 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +public class ZkCoreNodeProps { + private ZkNodeProps nodeProps; + + public ZkCoreNodeProps(ZkNodeProps nodeProps) { + this.nodeProps = nodeProps; + } + + public String getCoreUrl() { + return getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), nodeProps.getStr(ZkStateReader.CORE_NAME_PROP)); + } + + public String getNodeName() { + return nodeProps.getStr(ZkStateReader.NODE_NAME_PROP); + } + + public String getState() { + return nodeProps.getStr(ZkStateReader.STATE_PROP); + } + + public String getBaseUrl() { + return nodeProps.getStr(ZkStateReader.BASE_URL_PROP); + } + + public String getCoreName() { + return nodeProps.getStr(ZkStateReader.CORE_NAME_PROP); + } + + public static String getCoreUrl(ZkNodeProps nodeProps) { + return getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), nodeProps.getStr(ZkStateReader.CORE_NAME_PROP)); + } + + public static String getCoreUrl(String baseUrl, String coreName) { + StringBuilder sb = new StringBuilder(); + sb.append(baseUrl); + if (!baseUrl.endsWith("/")) sb.append("/"); + sb.append(coreName); + if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/"); + return sb.toString(); + } + + @Override + public String toString() { + return nodeProps.toString(); + } + + public ZkNodeProps getNodeProps() { + return nodeProps; + } + + public boolean isLeader() { + return nodeProps.containsKey(ZkStateReader.LEADER_PROP); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java new file mode 100644 index 0000000..b4ab6d8 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java @@ -0,0 +1,45 @@ +package org.apache.solr.common.cloud; + +import java.util.Collection; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface ZkCredentialsProvider { + + public class ZkCredentials { + String scheme; + byte[] auth; + + public ZkCredentials(String scheme, byte[] auth) { + super(); + this.scheme = scheme; + this.auth = auth; + } + + String getScheme() { + return scheme; + } + + byte[] getAuth() { + return auth; + } + } + + Collection<ZkCredentials> getCredentials(); + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java new file mode 100644 index 0000000..5ddfa24 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java @@ -0,0 +1,154 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.noggit.JSONUtil; +import org.noggit.JSONWriter; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +/** + * ZkNodeProps contains generic immutable properties. + */ +public class ZkNodeProps implements JSONWriter.Writable { + + protected final Map<String,Object> propMap; + + /** + * Construct ZKNodeProps from map. + */ + public ZkNodeProps(Map<String,Object> propMap) { + this.propMap = propMap; + // TODO: store an unmodifiable map, but in a way that guarantees not to wrap more than once. + // Always wrapping introduces a memory leak. + } + + + /** + * Constructor that populates the from array of Strings in form key1, value1, + * key2, value2, ..., keyN, valueN + */ + public ZkNodeProps(String... keyVals) { + this( makeMap((Object[])keyVals) ); + } + + public static ZkNodeProps fromKeyVals(Object... keyVals) { + return new ZkNodeProps( makeMap(keyVals) ); + } + + public static Map<String,Object> makeMap(Object... keyVals) { + if ((keyVals.length & 0x01) != 0) { + throw new IllegalArgumentException("arguments should be key,value"); + } + Map<String,Object> propMap = new LinkedHashMap<>(keyVals.length>>1); + for (int i = 0; i < keyVals.length; i+=2) { + propMap.put(keyVals[i].toString(), keyVals[i+1]); + } + return propMap; + } + + + /** + * Get property keys. + */ + public Set<String> keySet() { + return propMap.keySet(); + } + + /** + * Get all properties as map. + */ + public Map<String, Object> getProperties() { + return propMap; + } + + /** Returns a shallow writable copy of the properties */ + public Map<String,Object> shallowCopy() { + return new LinkedHashMap<>(propMap); + } + + /** + * Create Replica from json string that is typically stored in zookeeper. + */ + public static ZkNodeProps load(byte[] bytes) { + Map<String, Object> props = (Map<String, Object>) ZkStateReader.fromJSON(bytes); + return new ZkNodeProps(props); + } + + @Override + public void write(JSONWriter jsonWriter) { + jsonWriter.write(propMap); + } + + /** + * Get a string property value. + */ + public String getStr(String key) { + Object o = propMap.get(key); + return o == null ? null : o.toString(); + } + + /** + * Get a string property value. + */ + public Integer getInt(String key, Integer def) { + Object o = propMap.get(key); + return o == null ? def : Integer.valueOf(o.toString()); + } + + /** + * Get a string property value. + */ + public String getStr(String key,String def) { + Object o = propMap.get(key); + return o == null ? def : o.toString(); + } + + public Object get(String key) { + return propMap.get(key); + } + + @Override + public String toString() { + return JSONUtil.toJSON(this); + /*** + StringBuilder sb = new StringBuilder(); + Set<Entry<String,Object>> entries = propMap.entrySet(); + for(Entry<String,Object> entry : entries) { + sb.append(entry.getKey() + "=" + entry.getValue() + "\n"); + } + return sb.toString(); + ***/ + } + + /** + * Check if property key exists. + */ + public boolean containsKey(String key) { + return propMap.containsKey(key); + } + + public boolean getBool(String key, boolean b) { + Object o = propMap.get(key); + if(o==null) return b; + return Boolean.parseBoolean(o.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java new file mode 100644 index 0000000..b4da540 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common.cloud; + +import java.io.IOException; + +import org.apache.zookeeper.KeeperException; + +/** + * A callback object which can be used for implementing retry-able operations. + * + */ +public abstract class ZkOperation { + + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * + * @return the result of the operation or null + */ + public abstract Object execute() throws KeeperException, InterruptedException; +}
