http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/CompositeIdRouter.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/CompositeIdRouter.java new file mode 100644 index 0000000..5b1652d --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/CompositeIdRouter.java @@ -0,0 +1,327 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.Hash; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +// +// user!uniqueid +// app!user!uniqueid +// user/4!uniqueid +// app/2!user/4!uniqueid +// +public class CompositeIdRouter extends HashBasedRouter { + public static final String NAME = "compositeId"; + + public static final String SEPARATOR = "!"; + + // separator used to optionally specify number of bits to allocate toward first part. + public static final int bitsSeparator = '/'; + private int bits = 16; + + @Override + public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCollection collection) { + String shardFieldName = getRouteField(collection); + if (shardFieldName != null && doc != null) { + Object o = doc.getFieldValue(shardFieldName); + if (o == null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No value for :" + shardFieldName + ". Unable to identify shard"); + id = o.toString(); + } + if (id.indexOf(SEPARATOR) < 0) { + return Hash.murmurhash3_x86_32(id, 0, id.length(), 0); + } + + return new KeyParser(id).getHash(); + } + + + /** + * Get Range for a given CompositeId based route key + * + * @param routeKey to return Range for + * @return Range for given routeKey + */ + public Range keyHashRange(String routeKey) { + if (routeKey.indexOf(SEPARATOR) < 0) { + int hash = sliceHash(routeKey, null, null, null); + return new Range(hash, hash); + } + + return new KeyParser(routeKey).getRange(); + } + + @Override + public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) { + if (shardKey == null) { + // search across whole collection + // TODO: this may need modification in the future when shard splitting could cause an overlap + return collection.getActiveSlices(); + } + String id = shardKey; + + if (shardKey.indexOf(SEPARATOR) < 0) { + // shardKey is a simple id, so don't do a range + return Collections.singletonList(hashToSlice(Hash.murmurhash3_x86_32(id, 0, id.length(), 0), collection)); + } + + Range completeRange = new KeyParser(id).getRange(); + + List<Slice> targetSlices = new ArrayList<>(1); + for (Slice slice : collection.getActiveSlices()) { + Range range = slice.getRange(); + if (range != null && range.overlaps(completeRange)) { + targetSlices.add(slice); + } + } + + return targetSlices; + } + + public List<Range> partitionRangeByKey(String key, Range range) { + List<Range> result = new ArrayList<>(3); + Range keyRange = keyHashRange(key); + if (!keyRange.overlaps(range)) { + throw new IllegalArgumentException("Key range does not overlap given range"); + } + if (keyRange.equals(range)) { + return Collections.singletonList(keyRange); + } else if (keyRange.isSubsetOf(range)) { + result.add(new Range(range.min, keyRange.min - 1)); + result.add(keyRange); + result.add((new Range(keyRange.max + 1, range.max))); + } else if (range.includes(keyRange.max)) { + result.add(new Range(range.min, keyRange.max)); + result.add(new Range(keyRange.max + 1, range.max)); + } else { + result.add(new Range(range.min, keyRange.min - 1)); + result.add(new Range(keyRange.min, range.max)); + } + return result; + } + + @Override + public List<Range> partitionRange(int partitions, Range range) { + int min = range.min; + int max = range.max; + + assert max >= min; + if (partitions == 0) return Collections.EMPTY_LIST; + long rangeSize = (long) max - (long) min; + long rangeStep = Math.max(1, rangeSize / partitions); + + List<Range> ranges = new ArrayList<>(partitions); + + long start = min; + long end = start; + + // keep track of the idealized target to avoid accumulating rounding errors + long targetStart = min; + long targetEnd = targetStart; + + // Round to avoid splitting hash domains across ranges if such rounding is not significant. + // With default bits==16, one would need to create more than 4000 shards before this + // becomes false by default. + int mask = 0x0000ffff; + boolean round = rangeStep >= (1 << bits) * 16; + + while (end < max) { + targetEnd = targetStart + rangeStep; + end = targetEnd; + + if (round && ((end & mask) != mask)) { + // round up or down? + int increment = 1 << bits; // 0x00010000 + long roundDown = (end | mask) - increment; + long roundUp = (end | mask) + increment; + if (end - roundDown < roundUp - end && roundDown > start) { + end = roundDown; + } else { + end = roundUp; + } + } + + // make last range always end exactly on MAX_VALUE + if (ranges.size() == partitions - 1) { + end = max; + } + ranges.add(new Range((int) start, (int) end)); + start = end + 1L; + targetStart = targetEnd + 1L; + } + + return ranges; + } + + /** + * Helper class to calculate parts, masks etc for an id. + */ + static class KeyParser { + String key; + int[] numBits; + int[] hashes; + int[] masks; + boolean triLevel; + int pieces; + + public KeyParser(final String key) { + this.key = key; + List<String> partsList = new ArrayList<>(3); + int firstSeparatorPos = key.indexOf(SEPARATOR); + if (-1 == firstSeparatorPos) { + partsList.add(key); + } else { + partsList.add(key.substring(0, firstSeparatorPos)); + int lastPos = key.length() - 1; + // Don't make any more parts if the first separator is the last char + if (firstSeparatorPos < lastPos) { + int secondSeparatorPos = key.indexOf(SEPARATOR, firstSeparatorPos + 1); + if (-1 == secondSeparatorPos) { + partsList.add(key.substring(firstSeparatorPos + 1)); + } else if (secondSeparatorPos == lastPos) { + // Don't make any more parts if the key has exactly two separators and + // they're the last two chars - back-compatibility with the behavior of + // String.split() - see SOLR-6257. + if (firstSeparatorPos < secondSeparatorPos - 1) { + partsList.add(key.substring(firstSeparatorPos + 1, secondSeparatorPos)); + } + } else { // The second separator is not the last char + partsList.add(key.substring(firstSeparatorPos + 1, secondSeparatorPos)); + partsList.add(key.substring(secondSeparatorPos + 1)); + } + // Ignore any further separators beyond the first two + } + } + pieces = partsList.size(); + String[] parts = partsList.toArray(new String[pieces]); + numBits = new int[2]; + if (key.endsWith("!") && pieces < 3) + pieces++; + hashes = new int[pieces]; + + if (pieces == 3) { + numBits[0] = 8; + numBits[1] = 8; + triLevel = true; + } else { + numBits[0] = 16; + triLevel = false; + } + + for (int i = 0; i < pieces; i++) { + if (i < pieces - 1) { + int commaIdx = parts[i].indexOf(bitsSeparator); + + if (commaIdx > 0) { + numBits[i] = getNumBits(parts[i], commaIdx); + parts[i] = parts[i].substring(0, commaIdx); + } + } + //Last component of an ID that ends with a '!' + if(i >= parts.length) + hashes[i] = Hash.murmurhash3_x86_32("", 0, "".length(), 0); + else + hashes[i] = Hash.murmurhash3_x86_32(parts[i], 0, parts[i].length(), 0); + } + masks = getMasks(); + } + + Range getRange() { + int lowerBound; + int upperBound; + + if (triLevel) { + lowerBound = hashes[0] & masks[0] | hashes[1] & masks[1]; + upperBound = lowerBound | masks[2]; + } else { + lowerBound = hashes[0] & masks[0]; + upperBound = lowerBound | masks[1]; + } + // If the upper bits are 0xF0000000, the range we want to cover is + // 0xF0000000 0xFfffffff + + if ((masks[0] == 0 && !triLevel) || (masks[0] == 0 && masks[1] == 0 && triLevel)) { + // no bits used from first part of key.. the code above will produce 0x000000000->0xffffffff + // which only works on unsigned space, but we're using signed space. + lowerBound = Integer.MIN_VALUE; + upperBound = Integer.MAX_VALUE; + } + Range r = new Range(lowerBound, upperBound); + return r; + } + + /** + * Get bit masks for routing based on routing level + */ + private int[] getMasks() { + int[] masks; + if (triLevel) + masks = getBitMasks(numBits[0], numBits[1]); + else + masks = getBitMasks(numBits[0]); + + return masks; + } + + private int[] getBitMasks(int firstBits, int secondBits) { + // java can't shift 32 bits + int[] masks = new int[3]; + masks[0] = firstBits == 0 ? 0 : (-1 << (32 - firstBits)); + masks[1] = (firstBits + secondBits) == 0 ? 0 : (-1 << (32 - firstBits - secondBits)); + masks[1] = masks[0] ^ masks[1]; + masks[2] = (firstBits + secondBits) == 32 ? 0 : ~(masks[0] | masks[1]); + return masks; + } + + private int getNumBits(String firstPart, int commaIdx) { + int v = 0; + for (int idx = commaIdx + 1; idx < firstPart.length(); idx++) { + char ch = firstPart.charAt(idx); + if (ch < '0' || ch > '9') return -1; + v = v * 10 + (ch - '0'); + } + return v > 32 ? -1 : v; + } + + private int[] getBitMasks(int firstBits) { + // java can't shift 32 bits + int[] masks; + masks = new int[2]; + masks[0] = firstBits == 0 ? 0 : (-1 << (32 - firstBits)); + masks[1] = firstBits == 32 ? 0 : (-1 >>> firstBits); + return masks; + } + + int getHash() { + int result = hashes[0] & masks[0]; + + for (int i = 1; i < pieces; i++) + result = result | (hashes[i] & masks[i]); + return result; + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ConnectionManager.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ConnectionManager.java new file mode 100644 index 0000000..fd43f00 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ConnectionManager.java @@ -0,0 +1,237 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.solr.common.SolrException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionManager implements Watcher { + protected static final Logger log = LoggerFactory + .getLogger(ConnectionManager.class); + + private final String name; + + private volatile boolean connected = false; + + private final ZkClientConnectionStrategy connectionStrategy; + + private final String zkServerAddress; + + private final SolrZkClient client; + + private final OnReconnect onReconnect; + private final BeforeReconnect beforeReconnect; + + private volatile boolean isClosed = false; + + // Track the likely expired state + private static class LikelyExpiredState { + private static LikelyExpiredState NOT_EXPIRED = new LikelyExpiredState(StateType.NOT_EXPIRED, 0); + private static LikelyExpiredState EXPIRED = new LikelyExpiredState(StateType.EXPIRED, 0); + + public enum StateType { + NOT_EXPIRED, // definitely not expired + EXPIRED, // definitely expired + TRACKING_TIME // not sure, tracking time of last disconnect + } + + private StateType stateType; + private long lastDisconnectTime; + public LikelyExpiredState(StateType stateType, long lastDisconnectTime) { + this.stateType = stateType; + this.lastDisconnectTime = lastDisconnectTime; + } + + public boolean isLikelyExpired(long timeToExpire) { + return stateType == StateType.EXPIRED + || ( stateType == StateType.TRACKING_TIME && (System.nanoTime() - lastDisconnectTime > TimeUnit.NANOSECONDS.convert(timeToExpire, TimeUnit.MILLISECONDS))); + } + } + + private volatile LikelyExpiredState likelyExpiredState = LikelyExpiredState.EXPIRED; + + public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect) { + this.name = name; + this.client = client; + this.connectionStrategy = strat; + this.zkServerAddress = zkServerAddress; + this.onReconnect = onConnect; + this.beforeReconnect = beforeReconnect; + } + + private synchronized void connected() { + connected = true; + likelyExpiredState = LikelyExpiredState.NOT_EXPIRED; + notifyAll(); + } + + private synchronized void disconnected() { + connected = false; + // record the time we expired unless we are already likely expired + if (!likelyExpiredState.isLikelyExpired(0)) { + likelyExpiredState = new LikelyExpiredState(LikelyExpiredState.StateType.TRACKING_TIME, System.nanoTime()); + } + notifyAll(); + } + + @Override + public void process(WatchedEvent event) { + if (log.isInfoEnabled()) { + log.info("Watcher " + this + " name:" + name + " got event " + event + + " path:" + event.getPath() + " type:" + event.getType()); + } + + if (isClosed) { + log.info("Client->ZooKeeper status change trigger but we are already closed"); + return; + } + + KeeperState state = event.getState(); + + if (state == KeeperState.SyncConnected) { + connected(); + connectionStrategy.connected(); + } else if (state == KeeperState.Expired) { + // we don't call disconnected here, because we know we are expired + connected = false; + likelyExpiredState = LikelyExpiredState.EXPIRED; + + log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper..."); + + if (beforeReconnect != null) { + try { + beforeReconnect.command(); + } catch (Exception e) { + log.warn("Exception running beforeReconnect command", e); + } + } + + try { + connectionStrategy.reconnect(zkServerAddress, + client.getZkClientTimeout(), this, + new ZkClientConnectionStrategy.ZkUpdate() { + @Override + public void update(SolrZooKeeper keeper) { + try { + waitForConnected(Long.MAX_VALUE); + } catch (Exception e1) { + closeKeeper(keeper); + throw new RuntimeException(e1); + } + + log.info("Connection with ZooKeeper reestablished."); + try { + client.updateKeeper(keeper); + } catch (InterruptedException e) { + closeKeeper(keeper); + Thread.currentThread().interrupt(); + // we must have been asked to stop + throw new RuntimeException(e); + } catch (Exception t) { + closeKeeper(keeper); + throw new RuntimeException(t); + } + + if (onReconnect != null) { + onReconnect.command(); + } + } + }); + } catch (Exception e) { + SolrException.log(log, "", e); + } + log.info("Connected:" + connected); + } else if (state == KeeperState.Disconnected) { + log.info("zkClient has disconnected"); + disconnected(); + connectionStrategy.disconnected(); + } else if (state == KeeperState.AuthFailed) { + log.warn("zkClient received AuthFailed"); + } + } + + public synchronized boolean isConnected() { + return !isClosed && connected; + } + + // we use a volatile rather than sync + // to avoid possible deadlock on shutdown + public void close() { + this.isClosed = true; + this.likelyExpiredState = LikelyExpiredState.EXPIRED; + } + + public boolean isLikelyExpired() { + return isClosed || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90)); + } + + public synchronized void waitForConnected(long waitForConnection) + throws TimeoutException { + log.info("Waiting for client to connect to ZooKeeper"); + long expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForConnection, TimeUnit.MILLISECONDS); + long left = 1; + while (!connected && left > 0) { + if (isClosed) { + break; + } + try { + wait(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + left = expire - System.nanoTime(); + } + if (!connected) { + throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms"); + } + log.info("Client is connected to ZooKeeper"); + } + + public synchronized void waitForDisconnected(long timeout) + throws InterruptedException, TimeoutException { + long expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); + long left = timeout; + while (connected && left > 0) { + wait(left); + left = expire - System.nanoTime(); + } + if (connected) { + throw new TimeoutException("Did not disconnect"); + } + } + + private void closeKeeper(SolrZooKeeper keeper) { + try { + keeper.close(); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.error("", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, + "", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java new file mode 100644 index 0000000..c2f4a31 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java @@ -0,0 +1,75 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import org.apache.solr.common.SolrException; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO: improve backoff retry impl + */ +public class DefaultConnectionStrategy extends ZkClientConnectionStrategy { + + private static Logger log = LoggerFactory.getLogger(DefaultConnectionStrategy.class); + + @Override + public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException { + SolrZooKeeper zk = createSolrZooKeeper(serverAddress, timeout, watcher); + boolean success = false; + try { + updater.update(zk); + success = true; + } finally { + if (!success) { + zk.close(); + } + } + } + + @Override + public void reconnect(final String serverAddress, final int zkClientTimeout, + final Watcher watcher, final ZkUpdate updater) throws IOException { + log.info("Connection expired - starting a new one..."); + SolrZooKeeper zk = createSolrZooKeeper(serverAddress, zkClientTimeout, watcher); + boolean success = false; + try { + updater + .update(zk); + success = true; + log.info("Reconnected to ZooKeeper"); + } catch (Exception e) { + SolrException.log(log, "Reconnect to ZooKeeper failed", e); + log.info("Reconnect to ZooKeeper failed"); + } finally { + if (!success) { + try { + zk.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkACLProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkACLProvider.java new file mode 100644 index 0000000..9b0301a --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkACLProvider.java @@ -0,0 +1,45 @@ +package org.apache.solr.common.cloud; + +import java.util.List; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class DefaultZkACLProvider implements ZkACLProvider { + + private List<ACL> globalACLsToAdd; + + @Override + public List<ACL> getACLsToAdd(String zNodePath) { + // In default (simple) implementation use the same set of ACLs for all znodes + if (globalACLsToAdd == null) { + synchronized (this) { + if (globalACLsToAdd == null) globalACLsToAdd = createGlobalACLsToAdd(); + } + } + return globalACLsToAdd; + + } + + protected List<ACL> createGlobalACLsToAdd() { + return ZooDefs.Ids.OPEN_ACL_UNSAFE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkCredentialsProvider.java new file mode 100644 index 0000000..ca09068 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DefaultZkCredentialsProvider.java @@ -0,0 +1,41 @@ +package org.apache.solr.common.cloud; + +import java.util.ArrayList; +import java.util.Collection; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class DefaultZkCredentialsProvider implements ZkCredentialsProvider { + + private Collection<ZkCredentials> zkCredentials; + + @Override + public Collection<ZkCredentials> getCredentials() { + if (zkCredentials == null) { + synchronized (this) { + if (zkCredentials == null) zkCredentials = createCredentials(); + } + } + return zkCredentials; + } + + protected Collection<ZkCredentials> createCredentials() { + return new ArrayList<ZkCredentials>(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocCollection.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocCollection.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocCollection.java new file mode 100644 index 0000000..a5e4252 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocCollection.java @@ -0,0 +1,201 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.noggit.JSONUtil; +import org.noggit.JSONWriter; + +/** + * Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection") + */ +public class DocCollection extends ZkNodeProps { + public static final String DOC_ROUTER = "router"; + public static final String SHARDS = "shards"; + public static final String STATE_FORMAT = "stateFormat"; + private int znodeVersion = -1; // sentinel + + private final String name; + private final Map<String, Slice> slices; + private final Map<String, Slice> activeSlices; + private final DocRouter router; + private final String znode; + + private final Integer replicationFactor; + private final Integer maxShardsPerNode; + private final boolean autoAddReplicas; + + + public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) { + this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE); + } + + /** + * @param name The name of the collection + * @param slices The logical shards of the collection. This is used directly and a copy is not made. + * @param props The properties of the slice. This is used directly and a copy is not made. + */ + public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) { + super(props==null ? props = new HashMap<String,Object>() : props); + this.znodeVersion = zkVersion; + this.name = name; + + this.slices = slices; + this.activeSlices = new HashMap<>(); + Object replicationFactorObject = (Object) props.get(ZkStateReader.REPLICATION_FACTOR); + if (replicationFactorObject != null) { + this.replicationFactor = Integer.parseInt(replicationFactorObject.toString()); + } else { + this.replicationFactor = null; + } + Object maxShardsPerNodeObject = (Object) props.get(ZkStateReader.MAX_SHARDS_PER_NODE); + if (maxShardsPerNodeObject != null) { + this.maxShardsPerNode = Integer.parseInt(maxShardsPerNodeObject.toString()); + } else { + this.maxShardsPerNode = null; + } + Object autoAddReplicasObject = (Object) props.get(ZkStateReader.AUTO_ADD_REPLICAS); + if (autoAddReplicasObject != null) { + this.autoAddReplicas = Boolean.parseBoolean(autoAddReplicasObject.toString()); + } else { + this.autoAddReplicas = false; + } + + Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator(); + + while (iter.hasNext()) { + Map.Entry<String, Slice> slice = iter.next(); + if (slice.getValue().getState().equals(Slice.ACTIVE)) + this.activeSlices.put(slice.getKey(), slice.getValue()); + } + this.router = router; + this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode; + assert name != null && slices != null; + } + + /**Use this to make an exact copy of DocCollection with a new set of Slices and every other property as is + * @param slices the new set of Slices + * @return the resulting DocCollection + */ + public DocCollection copyWithSlices(Map<String, Slice> slices){ + return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode); + } + + /** + * Return collection name. + */ + public String getName() { + return name; + } + + public Slice getSlice(String sliceName) { + return slices.get(sliceName); + } + + /** + * Gets the list of all slices for this collection. + */ + public Collection<Slice> getSlices() { + return slices.values(); + } + + + /** + * Return the list of active slices for this collection. + */ + public Collection<Slice> getActiveSlices() { + return activeSlices.values(); + } + + /** + * Get the map of all slices (sliceName->Slice) for this collection. + */ + public Map<String, Slice> getSlicesMap() { + return slices; + } + + /** + * Get the map of active slices (sliceName->Slice) for this collection. + */ + public Map<String, Slice> getActiveSlicesMap() { + return activeSlices; + } + + public int getZNodeVersion(){ + return znodeVersion; + } + + public int getStateFormat() { + return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1 : 2; + } + /** + * @return replication factor for this collection or null if no + * replication factor exists. + */ + public Integer getReplicationFactor() { + return replicationFactor; + } + + public boolean getAutoAddReplicas() { + return autoAddReplicas; + } + + public int getMaxShardsPerNode() { + if (maxShardsPerNode == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, ZkStateReader.MAX_SHARDS_PER_NODE + " is not in the cluster state."); + } + return maxShardsPerNode; + } + + public String getZNode(){ + return znode; + } + + + public DocRouter getRouter() { + return router; + } + + @Override + public String toString() { + return "DocCollection("+name+")=" + JSONUtil.toJSON(this); + } + + @Override + public void write(JSONWriter jsonWriter) { + LinkedHashMap<String, Object> all = new LinkedHashMap<>(slices.size() + 1); + all.putAll(propMap); + all.put(SHARDS, slices); + jsonWriter.write(all); + } + + public Replica getReplica(String coreNodeName) { + for (Slice slice : slices.values()) { + Replica replica = slice.getReplica(coreNodeName); + if (replica != null) return replica; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocRouter.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocRouter.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocRouter.java new file mode 100644 index 0000000..24d00dd --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/DocRouter.java @@ -0,0 +1,227 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.StrUtils; +import org.noggit.JSONWriter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER; + +/** + * Class to partition int range into n ranges. + * @lucene.experimental + */ +public abstract class DocRouter { + public static final String DEFAULT_NAME = CompositeIdRouter.NAME; + public static final DocRouter DEFAULT = new CompositeIdRouter(); + + + public static DocRouter getDocRouter(String routerName) { + DocRouter router = routerMap.get(routerName); + if (router != null) return router; + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerName + "'"); + } + + protected String getRouteField(DocCollection coll){ + if(coll == null) return null; + Object o = coll.get(DOC_ROUTER); + if (o instanceof String) { + return null; + //old format. cannot have a routefield. Ignore it + } + Map m = (Map) o; + if(m == null) return null; + return (String) m.get("field"); + + } + + public static Map<String,Object> getRouterSpec(ZkNodeProps props){ + Map<String,Object> map = new LinkedHashMap<>(); + for (String s : props.keySet()) { + if(s.startsWith("router.")){ + map.put(s.substring(7), props.get(s)); + } + } + Object o = props.get("router"); + if (o instanceof String) { + map.put("name", o); + } else if (map.get("name") == null) { + map.put("name", DEFAULT_NAME); + } + return map; + } + + // currently just an implementation detail... + private final static Map<String, DocRouter> routerMap; + static { + routerMap = new HashMap<>(); + PlainIdRouter plain = new PlainIdRouter(); + // instead of doing back compat this way, we could always convert the clusterstate on first read to "plain" if it doesn't have any properties. + routerMap.put(null, plain); // back compat with 4.0 + routerMap.put(PlainIdRouter.NAME, plain); + routerMap.put(CompositeIdRouter.NAME, DEFAULT_NAME.equals(CompositeIdRouter.NAME) ? DEFAULT : new CompositeIdRouter()); + routerMap.put(ImplicitDocRouter.NAME, new ImplicitDocRouter()); + // NOTE: careful that the map keys (the static .NAME members) are filled in by making them final + } + + + // Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min. + // TODO: ranges may not be all contiguous in the future (either that or we will + // need an extra class to model a collection of ranges) + public static class Range implements JSONWriter.Writable, Comparable<Range> { + public int min; // inclusive + public int max; // inclusive + + public Range(int min, int max) { + assert min <= max; + this.min = min; + this.max = max; + } + + public boolean includes(int hash) { + return hash >= min && hash <= max; + } + + public boolean isSubsetOf(Range superset) { + return superset.min <= min && superset.max >= max; + } + + public boolean overlaps(Range other) { + return includes(other.min) || includes(other.max) || isSubsetOf(other); + } + + @Override + public String toString() { + return Integer.toHexString(min) + '-' + Integer.toHexString(max); + } + + + @Override + public int hashCode() { + // difficult numbers to hash... only the highest bits will tend to differ. + // ranges will only overlap during a split, so we can just hash the lower range. + return (min>>28) + (min>>25) + (min>>21) + min; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() != getClass()) return false; + Range other = (Range)obj; + return this.min == other.min && this.max == other.max; + } + + @Override + public void write(JSONWriter writer) { + writer.write(toString()); + } + + @Override + public int compareTo(Range that) { + int mincomp = Integer.valueOf(this.min).compareTo(that.min); + return mincomp == 0 ? Integer.valueOf(this.max).compareTo(that.max) : mincomp; + } + } + + public Range fromString(String range) { + int middle = range.indexOf('-'); + String minS = range.substring(0, middle); + String maxS = range.substring(middle+1); + long min = Long.parseLong(minS, 16); // use long to prevent the parsing routines from potentially worrying about overflow + long max = Long.parseLong(maxS, 16); + return new Range((int)min, (int)max); + } + + public Range fullRange() { + return new Range(Integer.MIN_VALUE, Integer.MAX_VALUE); + } + + /** + * Returns the range for each partition + */ + public List<Range> partitionRange(int partitions, Range range) { + int min = range.min; + int max = range.max; + + assert max >= min; + if (partitions == 0) return Collections.EMPTY_LIST; + long rangeSize = (long)max - (long)min; + long rangeStep = Math.max(1, rangeSize / partitions); + + List<Range> ranges = new ArrayList<>(partitions); + + long start = min; + long end = start; + + while (end < max) { + end = start + rangeStep; + // make last range always end exactly on MAX_VALUE + if (ranges.size() == partitions - 1) { + end = max; + } + ranges.add(new Range((int)start, (int)end)); + start = end + 1L; + } + + return ranges; + } + + /** Returns the Slice that the document should reside on, or null if there is not enough information */ + public abstract Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection); + + /** This method is consulted to determine what slices should be queried for a request when + * an explicit shards parameter was not used. + * This method only accepts a single shard key (or null). If you have a comma separated list of shard keys, + * call getSearchSlices + **/ + public abstract Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection); + + public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection); + + + /** This method is consulted to determine what slices should be queried for a request when + * an explicit shards parameter was not used. + * This method accepts a multi-valued shardKeys parameter (normally comma separated from the shard.keys request parameter) + * and aggregates the slices returned by getSearchSlicesSingle for each shardKey. + **/ + public Collection<Slice> getSearchSlices(String shardKeys, SolrParams params, DocCollection collection) { + if (shardKeys == null || shardKeys.indexOf(',') < 0) { + return getSearchSlicesSingle(shardKeys, params, collection); + } + + List<String> shardKeyList = StrUtils.splitSmart(shardKeys, ",", true); + HashSet<Slice> allSlices = new HashSet<>(); + for (String shardKey : shardKeyList) { + allSlices.addAll( getSearchSlicesSingle(shardKey, params, collection) ); + } + return allSlices; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/HashBasedRouter.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/HashBasedRouter.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/HashBasedRouter.java new file mode 100644 index 0000000..4132f01 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/HashBasedRouter.java @@ -0,0 +1,81 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.Hash; + +import java.util.Collection; +import java.util.Collections; + +public abstract class HashBasedRouter extends DocRouter { + + @Override + public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) { + int hash; + if (route != null) { + hash = sliceHash(route, sdoc, params, collection); + } else { + if (id == null) id = getId(sdoc, params); + hash = sliceHash(id, sdoc, params, collection); + } + return hashToSlice(hash, collection); + } + + @Override + public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) { + if (id == null) id = getId(sdoc, params); + int hash = sliceHash(id, sdoc, params, collection); + Range range = collection.getSlice(shardId).getRange(); + return range != null && range.includes(hash); + } + + public int sliceHash(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) { + return Hash.murmurhash3_x86_32(id, 0, id.length(), 0); + } + + protected String getId(SolrInputDocument sdoc, SolrParams params) { + Object idObj = sdoc.getFieldValue("id"); // blech + String id = idObj != null ? idObj.toString() : "null"; // should only happen on client side + return id; + } + + protected Slice hashToSlice(int hash, DocCollection collection) { + for (Slice slice : collection.getActiveSlices()) { + Range range = slice.getRange(); + if (range != null && range.includes(hash)) return slice; + } + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active slice servicing hash code " + Integer.toHexString(hash) + " in " + collection); + } + + + @Override + public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) { + if (shardKey == null) { + // search across whole collection + // TODO: this may need modification in the future when shard splitting could cause an overlap + return collection.getActiveSlices(); + } + + // use the shardKey as an id for plain hashing + Slice slice = getTargetSlice(shardKey, null, null, params, collection); + return slice == null ? Collections.<Slice>emptyList() : Collections.singletonList(slice); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ImplicitDocRouter.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ImplicitDocRouter.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ImplicitDocRouter.java new file mode 100644 index 0000000..3b2e83d --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ImplicitDocRouter.java @@ -0,0 +1,104 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import static org.apache.solr.common.params.ShardParams._ROUTE_; + +/** This document router is for custom sharding + */ +public class ImplicitDocRouter extends DocRouter { + + public static final String NAME = "implicit"; + + private static Logger log = LoggerFactory + .getLogger(ImplicitDocRouter.class); + + @Override + public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) { + String shard = null; + + if (route != null) // if a route is already passed in, try to use it + shard = route; + else if (sdoc != null) { + String f = getRouteField(collection); + if(f !=null) { + Object o = sdoc.getFieldValue(f); + if (o != null) shard = o.toString(); + else throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No value for field "+f +" in " + sdoc); + } + if(shard == null) { + Object o = sdoc.getFieldValue(_ROUTE_); + if (o != null) { + shard = o.toString(); + } + } + } + + if (shard == null) { + shard = params.get(_ROUTE_); + } + + if (shard != null) { + + Slice slice = collection.getSlice(shard); + if (slice == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard called =" + shard + " in " + collection); + } + return slice; + } + + return null; // no shard specified... use default. + } + + @Override + public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) { + + // todo : how to handle this? + return false; + } + + @Override + public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) { + + if (shardKey == null) { + return collection.getActiveSlices(); + } + + // assume the shardKey is just a slice name + Slice slice = collection.getSlice(shardKey); + if (slice == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "implicit router can't find shard " + shardKey + " in collection " + collection.getName()); + } + + return Collections.singleton(slice); + } + + @Override + public List<Range> partitionRange(int partitions, Range range) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/OnReconnect.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/OnReconnect.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/OnReconnect.java new file mode 100644 index 0000000..e447982 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/OnReconnect.java @@ -0,0 +1,22 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +public interface OnReconnect { + public void command(); +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/PlainIdRouter.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/PlainIdRouter.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/PlainIdRouter.java new file mode 100644 index 0000000..bd14089 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/PlainIdRouter.java @@ -0,0 +1,23 @@ +package org.apache.solr.common.cloud; + + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class PlainIdRouter extends HashBasedRouter { + public static final String NAME = "plain"; +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Replica.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Replica.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Replica.java new file mode 100644 index 0000000..ecd4837 --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Replica.java @@ -0,0 +1,48 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.noggit.JSONUtil; + +import java.util.Map; + + +public class Replica extends ZkNodeProps { + private final String name; + private final String nodeName; + + public Replica(String name, Map<String,Object> propMap) { + super(propMap); + this.name = name; + nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP); + } + + public String getName() { + return name; + } + + /** The name of the node this replica resides on */ + public String getNodeName() { + return nodeName; + } + + @Override + public String toString() { + return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/RoutingRule.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/RoutingRule.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/RoutingRule.java new file mode 100644 index 0000000..69f810f --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/RoutingRule.java @@ -0,0 +1,71 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.noggit.JSONUtil; + +/** + * Used for routing docs with particular keys into another collection + */ +public class RoutingRule extends ZkNodeProps { + private final List<DocRouter.Range> routeRanges; + private final String routeRangesStr; + private final String targetCollectionName; + private final Long expireAt; + + public RoutingRule(String routeKey, Map<String, Object> propMap) { + super(propMap); + this.routeRangesStr = (String) propMap.get("routeRanges"); + String[] rangesArr = this.routeRangesStr.split(","); + if (rangesArr != null && rangesArr.length > 0) { + this.routeRanges = new ArrayList<>(); + for (String r : rangesArr) { + routeRanges.add(DocRouter.DEFAULT.fromString(r)); + } + } else { + this.routeRanges = null; + } + this.targetCollectionName = (String) propMap.get("targetCollection"); + this.expireAt = Long.parseLong((String) propMap.get("expireAt")); + } + + public List<DocRouter.Range> getRouteRanges() { + return routeRanges; + } + + public String getTargetCollectionName() { + return targetCollectionName; + } + + public Long getExpireAt() { + return expireAt; + } + + public String getRouteRangesStr() { + return routeRangesStr; + } + + @Override + public String toString() { + return JSONUtil.toJSON(propMap, -1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SaslZkACLProvider.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SaslZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SaslZkACLProvider.java new file mode 100644 index 0000000..86ab25f --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SaslZkACLProvider.java @@ -0,0 +1,49 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + +/** + * ZkACLProvider that gives all permissions for the user specified in System + * property "solr.authorization.superuser" (default: "solr") when using sasl, + * and gives read permissions for anyone else. Designed for a setup where + * configurations have already been set up and will not be modified, or + * where configuration changes are controlled via Solr APIs. + */ +public class SaslZkACLProvider extends DefaultZkACLProvider { + + private static String superUser = System.getProperty("solr.authorization.superuser", "solr"); + + @Override + protected List<ACL> createGlobalACLsToAdd() { + List<ACL> result = new ArrayList<ACL>(); + result.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", superUser))); + result.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); + + if (result.isEmpty()) { + result = super.createGlobalACLsToAdd(); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Slice.java ---------------------------------------------------------------------- diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Slice.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Slice.java new file mode 100644 index 0000000..70abf6e --- /dev/null +++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/Slice.java @@ -0,0 +1,196 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.noggit.JSONUtil; +import org.noggit.JSONWriter; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A Slice contains immutable information about a logical shard (all replicas that share the same shard id). + */ +public class Slice extends ZkNodeProps { + public static String REPLICAS = "replicas"; + public static String RANGE = "range"; + public static String STATE = "state"; + public static String LEADER = "leader"; // FUTURE: do we want to record the leader as a slice property in the JSON (as opposed to isLeader as a replica property?) + public static String ACTIVE = "active"; + public static String INACTIVE = "inactive"; + public static String CONSTRUCTION = "construction"; + public static String RECOVERY = "recovery"; + public static String PARENT = "parent"; + + private final String name; + private final DocRouter.Range range; + private final Integer replicationFactor; // FUTURE: optional per-slice override of the collection replicationFactor + private final Map<String,Replica> replicas; + private final Replica leader; + private final String state; + private final String parent; + private final Map<String, RoutingRule> routingRules; + + /** + * @param name The name of the slice + * @param replicas The replicas of the slice. This is used directly and a copy is not made. If null, replicas will be constructed from props. + * @param props The properties of the slice - a shallow copy will always be made. + */ + public Slice(String name, Map<String,Replica> replicas, Map<String,Object> props) { + super( props==null ? new LinkedHashMap<String,Object>(2) : new LinkedHashMap<>(props)); + this.name = name; + + Object rangeObj = propMap.get(RANGE); + if (propMap.containsKey(STATE) && propMap.get(STATE) != null) + this.state = (String) propMap.get(STATE); + else { + this.state = ACTIVE; //Default to ACTIVE + propMap.put(STATE, this.state); + } + DocRouter.Range tmpRange = null; + if (rangeObj instanceof DocRouter.Range) { + tmpRange = (DocRouter.Range)rangeObj; + } else if (rangeObj != null) { + // Doesn't support custom implementations of Range, but currently not needed. + tmpRange = DocRouter.DEFAULT.fromString(rangeObj.toString()); + } + range = tmpRange; + + /** debugging. this isn't an error condition for custom sharding. + if (range == null) { + System.out.println("###### NO RANGE for " + name + " props=" + props); + } + **/ + + if (propMap.containsKey(PARENT) && propMap.get(PARENT) != null) + this.parent = (String) propMap.get(PARENT); + else + this.parent = null; + + replicationFactor = null; // future + + // add the replicas *after* the other properties (for aesthetics, so it's easy to find slice properties in the JSON output) + this.replicas = replicas != null ? replicas : makeReplicas((Map<String,Object>)propMap.get(REPLICAS)); + propMap.put(REPLICAS, this.replicas); + + Map<String, Object> rules = (Map<String, Object>) propMap.get("routingRules"); + if (rules != null) { + this.routingRules = new HashMap<>(); + for (Map.Entry<String, Object> entry : rules.entrySet()) { + Object o = entry.getValue(); + if (o instanceof Map) { + Map map = (Map) o; + RoutingRule rule = new RoutingRule(entry.getKey(), map); + routingRules.put(entry.getKey(), rule); + } else { + routingRules.put(entry.getKey(), (RoutingRule) o); + } + } + } else { + this.routingRules = null; + } + + leader = findLeader(); + } + + + private Map<String,Replica> makeReplicas(Map<String,Object> genericReplicas) { + if (genericReplicas == null) return new HashMap<>(1); + Map<String,Replica> result = new LinkedHashMap<>(genericReplicas.size()); + for (Map.Entry<String,Object> entry : genericReplicas.entrySet()) { + String name = entry.getKey(); + Object val = entry.getValue(); + Replica r; + if (val instanceof Replica) { + r = (Replica)val; + } else { + r = new Replica(name, (Map<String,Object>)val); + } + result.put(name, r); + } + return result; + } + + private Replica findLeader() { + for (Replica replica : replicas.values()) { + if (replica.getStr(LEADER) != null) return replica; + } + return null; + } + + /** + * Return slice name (shard id). + */ + public String getName() { + return name; + } + + /** + * Gets the list of replicas for this slice. + */ + public Collection<Replica> getReplicas() { + return replicas.values(); + } + + /** + * Get the map of coreNodeName to replicas for this slice. + */ + public Map<String, Replica> getReplicasMap() { + return replicas; + } + + public Map<String,Replica> getReplicasCopy() { + return new LinkedHashMap<>(replicas); + } + + public Replica getLeader() { + return leader; + } + + public Replica getReplica(String replicaName) { + return replicas.get(replicaName); + } + + public DocRouter.Range getRange() { + return range; + } + + public String getState() { + return state; + } + + public String getParent() { + return parent; + } + + public Map<String, RoutingRule> getRoutingRules() { + return routingRules; + } + + @Override + public String toString() { + return name + ':' + JSONUtil.toJSON(propMap); + } + + @Override + public void write(JSONWriter jsonWriter) { + jsonWriter.write(propMap); + } +}
