[
https://issues.apache.org/jira/browse/SOLR-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15279206#comment-15279206
]
ASF GitHub Bot commented on SOLR-8323:
--------------------------------------
Github user dragonsinth commented on a diff in the pull request:
https://github.com/apache/lucene-solr/pull/32#discussion_r62770569
--- Diff:
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java ---
@@ -1069,32 +1100,190 @@ public static String getCollectionPath(String
coll) {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
- public void addCollectionWatch(String coll) {
- if (interestingCollections.add(coll)) {
- LOG.info("addZkWatch [{}]", coll);
- new StateWatcher(coll).refreshAndWatch(false);
+ /**
+ * Notify this reader that a local Core is a member of a collection, and
so that collection
+ * state should be watched.
+ *
+ * Not a public API. This method should only be called from
ZkController.
+ *
+ * The number of cores per-collection is tracked, and adding multiple
cores from the same
+ * collection does not increase the number of watches.
+ *
+ * @param collection the collection that the core is a member of
+ *
+ * @see ZkStateReader#unregisterCore(String)
+ */
+ public void registerCore(String collection) {
+ AtomicBoolean reconstructState = new AtomicBoolean(false);
+ collectionWatches.compute(collection, (k, v) -> {
+ if (v == null) {
+ reconstructState.set(true);
+ v = new CollectionWatch();
+ }
+ v.coreRefCount++;
+ return v;
+ });
+ if (reconstructState.get()) {
+ new StateWatcher(collection).refreshAndWatch();
+ synchronized (getUpdateLock()) {
+ constructState();
+ }
+ }
+ }
+
+ /**
+ * Notify this reader that a local core that is a member of a collection
has been closed.
+ *
+ * Not a public API. This method should only be called from
ZkController.
+ *
+ * If no cores are registered for a collection, and there are no {@link
CollectionStateWatcher}s
+ * for that collection either, the collection watch will be removed.
+ *
+ * @param collection the collection that the core belongs to
+ */
+ public void unregisterCore(String collection) {
+ AtomicBoolean reconstructState = new AtomicBoolean(false);
+ collectionWatches.compute(collection, (k, v) -> {
+ if (v == null)
+ return null;
+ if (v.coreRefCount > 0)
+ v.coreRefCount--;
+ if (v.canBeRemoved()) {
+ watchedCollectionStates.remove(collection);
+ lazyCollectionStates.put(collection, new
LazyCollectionRef(collection));
+ reconstructState.set(true);
+ return null;
+ }
+ return v;
+ });
+ if (reconstructState.get()) {
+ synchronized (getUpdateLock()) {
+ constructState();
+ }
+ }
+ }
+
+ /**
+ * Register a CollectionStateWatcher to be called when the state of a
collection changes
+ *
+ * A given CollectionStateWatcher will be only called once. If you want
to have a persistent watcher,
+ * it should register itself again in its {@link
CollectionStateWatcher#onStateChanged(Set, DocCollection)}
+ * method.
+ */
+ public void registerCollectionStateWatcher(String collection,
CollectionStateWatcher stateWatcher) {
+ AtomicBoolean watchSet = new AtomicBoolean(false);
+ collectionWatches.compute(collection, (k, v) -> {
+ if (v == null) {
+ v = new CollectionWatch();
+ watchSet.set(true);
+ }
+ v.stateWatchers.add(stateWatcher);
+ return v;
+ });
+ if (watchSet.get()) {
+ new StateWatcher(collection).refreshAndWatch();
synchronized (getUpdateLock()) {
constructState();
}
}
}
+ /**
+ * Block until a CollectionStatePredicate returns true, or the wait
times out
+ *
+ * Note that the predicate may be called again even after it has
returned true, so
+ * implementors should avoid changing state within the predicate call
itself.
+ *
+ * @param collection the collection to watch
+ * @param wait how long to wait
+ * @param unit the units of the wait parameter
+ * @param predicate the predicate to call on state changes
+ * @throws InterruptedException on interrupt
+ * @throws TimeoutException on timeout
+ */
+ public void waitForState(final String collection, long wait, TimeUnit
unit, CollectionStatePredicate predicate)
+ throws InterruptedException, TimeoutException {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ CollectionStateWatcher watcher = new CollectionStateWatcher() {
+ @Override
+ public void onStateChanged(Set<String> liveNodes, DocCollection
collectionState) {
+ if (predicate.matches(liveNodes, collectionState)) {
+ latch.countDown();
+ } else {
+ registerCollectionStateWatcher(collection, this);
+ }
+ }
+ };
+ registerCollectionStateWatcher(collection, watcher);
+
+ try {
+ // check the current state
+ DocCollection dc = clusterState.getCollectionOrNull(collection);
+ if (predicate.matches(liveNodes, dc))
+ return;
+
+ // wait for the watcher predicate to return true, or time out
+ if (!latch.await(wait, unit))
+ throw new TimeoutException();
+
+ }
+ finally {
--- End diff --
nit: you have a few formatting issues here and elsewhere
> Add CollectionWatcher API to ZkStateReader
> ------------------------------------------
>
> Key: SOLR-8323
> URL: https://issues.apache.org/jira/browse/SOLR-8323
> Project: Solr
> Issue Type: Improvement
> Affects Versions: 6.0
> Reporter: Alan Woodward
> Assignee: Alan Woodward
> Attachments: SOLR-8323.patch, SOLR-8323.patch, SOLR-8323.patch,
> SOLR-8323.patch
>
>
> An API to watch for changes to collection state would be a generally useful
> thing, both internally and for client use.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]