Hi Mark,
I fixed a test failure of this test with Java 9. This test leaked a
CloudSolrClient in a static field which was not closed an nulled on @AfterClass.
This leads to the following Exception, which is quite self-explaining since we
updated randomized-testing (thanks Dawid for help!):
junit.framework.AssertionFailedError: Clean up static fields (in @AfterClass?)
and null them, your test still has references to classes of which the sizes
cannot be measured due to security restrictions or Java 9 module encapsulation:
- private static org.apache.solr.client.solrj.impl.CloudSolrClient
org.apache.solr.cloud.TestStressLiveNodes.CLOUD_CLIENT
at __randomizedtesting.SeedInfo.seed([95D541ECF64FB480]:0)
at
com.carrotsearch.randomizedtesting.rules.StaticFieldsInvariantRule$1.afterAlways(StaticFieldsInvariantRule.java:146)
at
com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:43)
at
com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at
com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at
org.apache.lucene.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
at
org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
at
org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
at
org.apache.lucene.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:54)
at
com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at
com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:367)
at java.lang.Thread.run(java.base@9-ea/Thread.java:804)
Caused by: java.lang.IllegalStateException: Unable to access 'private final
java.lang.reflect.Layer java.lang.reflect.Module.layer' to estimate memory usage
at
com.carrotsearch.randomizedtesting.rules.RamUsageEstimator.createCacheEntry(RamUsageEstimator.java:602)
at
com.carrotsearch.randomizedtesting.rules.RamUsageEstimator.measureSizeOf(RamUsageEstimator.java:545)
at
com.carrotsearch.randomizedtesting.rules.RamUsageEstimator.sizeOfAll(RamUsageEstimator.java:387)
at
com.carrotsearch.randomizedtesting.rules.StaticFieldsInvariantRule$1.afterAlways(StaticFieldsInvariantRule.java:129)
... 10 more
Caused by: java.lang.reflect.InaccessibleObjectException: Cannot make a
non-public member of class java.lang.reflect.Module accessible
at
sun.reflect.Reflection.throwInaccessibleObjectException(java.base@9-ea/Reflection.java:420)
at
java.lang.reflect.AccessibleObject.checkCanSetAccessible(java.base@9-ea/AccessibleObject.java:190)
at
java.lang.reflect.Field.checkCanSetAccessible(java.base@9-ea/Field.java:170)
at java.lang.reflect.Field.setAccessible(java.base@9-ea/Field.java:164)
at
com.carrotsearch.randomizedtesting.rules.RamUsageEstimator$3.run(RamUsageEstimator.java:597)
at
com.carrotsearch.randomizedtesting.rules.RamUsageEstimator$3.run(RamUsageEstimator.java:594)
at java.security.AccessController.doPrivileged(java.base@9-ea/Native
Method)
at
com.carrotsearch.randomizedtesting.rules.RamUsageEstimator.createCacheEntry(RamUsageEstimator.java:594)
... 13 more
Please be sure to not hang on large objects and network resources on test
shutdown. The problem is: You only see the test failure with Java 9's module
system, so have an eye on Jenkins test failures after committing.
Uwe
-----
Uwe Schindler
H.-H.-Meier-Allee 63, D-28213 Bremen
http://www.thetaphi.de
eMail: [email protected]
> -----Original Message-----
> From: [email protected] [mailto:[email protected]]
> Sent: Wednesday, April 13, 2016 3:55 PM
> To: [email protected]
> Subject: [2/2] lucene-solr:master: SOLR-8914: ZkStateReader's
> refreshLiveNodes(Watcher) is not thread safe.
>
> SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
> Commit: http://git-wip-us.apache.org/repos/asf/lucene-
> solr/commit/744b419b
> Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/744b419b
> Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/744b419b
>
> Branch: refs/heads/master
> Commit: 744b419b42a5797700c0a3a5f859d86ae9d05325
> Parents: 6c9391d
> Author: markrmiller <[email protected]>
> Authored: Wed Apr 13 09:54:40 2016 -0400
> Committer: markrmiller <[email protected]>
> Committed: Wed Apr 13 09:54:40 2016 -0400
>
> ----------------------------------------------------------------------
> solr/CHANGES.txt | 3 +
> .../apache/solr/cloud/TestStressLiveNodes.java | 252
> +++++++++++++++++++
> .../apache/solr/common/cloud/ZkStateReader.java | 85 ++++---
> 3 files changed, 311 insertions(+), 29 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/lucene-
> solr/blob/744b419b/solr/CHANGES.txt
> ----------------------------------------------------------------------
> diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
> index 0b98fa0..fbe4698 100644
> --- a/solr/CHANGES.txt
> +++ b/solr/CHANGES.txt
> @@ -108,6 +108,9 @@ Bug Fixes
> * SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters
> json parse error if a
> SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via
> shalin)
>
> +* SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread
> safe. (Scott Blum, hoss,
> + sarowe, Erick Erickson, Mark Miller, shalin)
> +
> Optimizations
> ----------------------
> * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer
> operation.
>
> http://git-wip-us.apache.org/repos/asf/lucene-
> solr/blob/744b419b/solr/core/src/test/org/apache/solr/cloud/TestStressLive
> Nodes.java
> ----------------------------------------------------------------------
> diff --git
> a/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
> b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
> new file mode 100644
> index 0000000..28dcc82
> --- /dev/null
> +++ b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
> @@ -0,0 +1,252 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.solr.cloud;
> +
> +import java.lang.invoke.MethodHandles;
> +import java.util.ArrayList;
> +import java.util.Collections;
> +import java.util.List;
> +import java.util.Random;
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.TimeUnit;
> +
> +import org.apache.lucene.util.LuceneTestCase.Slow;
> +import org.apache.lucene.util.TestUtil;
> +import org.apache.solr.cloud.SolrCloudTestCase;
> +import org.apache.solr.client.solrj.embedded.JettySolrRunner;
> +import org.apache.solr.client.solrj.impl.CloudSolrClient;
> +import org.apache.solr.common.cloud.SolrZkClient;
> +import org.apache.solr.common.cloud.ZkStateReader;
> +import org.apache.solr.common.util.ExecutorUtil;
> +import org.apache.solr.core.CloudConfig.CloudConfigBuilder;
> +import org.apache.solr.util.DefaultSolrThreadFactory;
> +
> +import org.apache.zookeeper.CreateMode;
> +import org.apache.zookeeper.KeeperException;
> +
> +import org.junit.BeforeClass;
> +
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +/**
> + * Stress test LiveNodes watching.
> + *
> + * Does bursts of adds to live_nodes using parallel threads to and verifies
> that after each
> + * burst a ZkStateReader detects the correct set.
> + */
> +@Slow
> +public class TestStressLiveNodes extends SolrCloudTestCase {
> +
> + private static final Logger log =
> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> +
> + /** A basic cloud client, we'll be testing the behavior of it's
> ZkStateReader
> */
> + private static CloudSolrClient CLOUD_CLIENT;
> +
> + /** The addr of the zk server used in this test */
> + private static String ZK_SERVER_ADDR;
> +
> + /* how many seconds we're willing to wait for our executor tasks to finish
> before failing the test */
> + private final static int WAIT_TIME = TEST_NIGHTLY ? 60 : 30;
> +
> + @BeforeClass
> + private static void createMiniSolrCloudCluster() throws Exception {
> +
> + // we only need 1 node, and we don't care about any configs or
> collections
> + // we're going to fake all the live_nodes changes we want to fake.
> + configureCluster(1).configure();
> +
> + // give all nodes a chance to come alive
> +
> TestTolerantUpdateProcessorCloud.assertSpinLoopAllJettyAreRunning(clust
> er);
> +
> + CLOUD_CLIENT = cluster.getSolrClient();
> + CLOUD_CLIENT.connect(); // force connection even though we aren't
> sending any requests
> +
> + ZK_SERVER_ADDR = cluster.getZkServer().getZkAddress();
> +
> + }
> +
> + private static SolrZkClient newSolrZkClient() {
> + assertNotNull(ZK_SERVER_ADDR);
> + // WTF is CloudConfigBuilder.DEFAULT_ZK_CLIENT_TIMEOUT private?
> + return new SolrZkClient(ZK_SERVER_ADDR, 15000);
> + }
> +
> + /** returns the true set of live nodes (currently in zk) as a sorted list
> */
> + private static List<String> getTrueLiveNodesFromZk() throws Exception {
> + SolrZkClient client = newSolrZkClient();
> + try {
> + ArrayList<String> result = new
> ArrayList<>(client.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null,
> true));
> + Collections.sort(result);
> + return result;
> + } finally {
> + client.close();
> + }
> + }
> +
> + /**
> + * returns the cached set of live nodes (according to the ZkStateReader in
> our CloudSolrClient)
> + * as a sorted list.
> + * This is done in a sleep+retry loop until the result matches the
> expectedCount, or a few iters have passed
> + * (this way we aren't testing how fast the watchers complete, just that
> they got the correct result)
> + */
> + private static List<String> getCachedLiveNodesFromLocalState(final int
> expectedCount) throws Exception {
> + ArrayList<String> result = null;
> +
> + for (int i = 0; i < 10; i++) {
> + result = new
> ArrayList<>(CLOUD_CLIENT.getZkStateReader().getClusterState().getLiveNo
> des());
> + if (expectedCount != result.size()) {
> + log.info("sleeping #{} to give watchers a chance to finish: {} !=
> {}",
> + i, expectedCount, result.size());
> + Thread.sleep(200);
> + } else {
> + break;
> + }
> + }
> + if (expectedCount != result.size()) {
> + log.error("gave up waiting for live nodes to match expected size: {} !=
> {}",
> + expectedCount, result.size());
> + }
> + Collections.sort(result);
> + return result;
> + }
> +
> + public void testStress() throws Exception {
> +
> + // do many iters, so we have "bursts" of adding nodes that we then check
> + final int numIters = atLeast(1000);
> + for (int iter = 0; iter < numIters; iter++) {
> +
> + // sanity check that ZK says there is in fact 1 live node
> + List<String> actualLiveNodes = getTrueLiveNodesFromZk();
> + assertEquals("iter"+iter+": " + actualLiveNodes.toString(),
> + 1, actualLiveNodes.size());
> +
> + // only here do we forcibly update the cached live nodes so we don't
> have to wait for it to catch up
> + // with all the ephemeral nodes that vanished after the last iteration
> + CLOUD_CLIENT.getZkStateReader().updateLiveNodes();
> +
> + // sanity check that our Cloud Client's local state knows about the 1
> (real)
> live node in our cluster
> + List<String> cachedLiveNodes =
> getCachedLiveNodesFromLocalState(actualLiveNodes.size());
> + assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " +
> cachedLiveNodes.size(),
> + actualLiveNodes, cachedLiveNodes);
> +
> +
> + // start spining up some threads to add some live_node children in
> parallel
> +
> + // we don't need a lot of threads or nodes (we don't want to swamp the
> CPUs
> + // just bursts of conccurent adds) but we do want to randomize it a
> bit so
> we increase the
> + // odds of concurrent watchers firing regardless of the num CPUs or
> load
> on the machine running
> + // the test (but we deliberately don't look at availableProcessors()
> since
> we want randomization
> + // consistency across all machines for a given seed)
> + final int numThreads = TestUtil.nextInt(random(), 2, 5);
> +
> + // use same num for all thrashers, to increase likely hood of them all
> competing
> + // (diff random number would mean heavy concurency only for ~ the
> first N=lowest num requetss)
> + //
> + // this does not need to be a large number -- in fact, the higher it
> is, the
> more
> + // likely we are to see a mistake in early watcher triggers get
> "corrected"
> by a later one
> + // and overlook a possible bug
> + final int numNodesPerThrasher = TestUtil.nextInt(random(), 1, 5);
> +
> + log.info("preparing parallel adds to live nodes: iter={}, numThreads={}
> numNodesPerThread={}",
> + iter, numThreads, numNodesPerThrasher);
> +
> + // NOTE: using ephemeral nodes
> + // so we can't close any of these thrashers until we are done with our
> assertions
> + final List<LiveNodeTrasher> thrashers = new ArrayList<>(numThreads);
> + for (int i = 0; i < numThreads; i++) {
> + thrashers.add(new LiveNodeTrasher("T"+iter+"_"+i,
> numNodesPerThrasher));
> + }
> + try {
> + final ExecutorService executorService =
> ExecutorUtil.newMDCAwareFixedThreadPool
> + (thrashers.size()+1, new
> DefaultSolrThreadFactory("test_live_nodes_thrasher_iter"+iter));
> +
> + executorService.invokeAll(thrashers);
> + executorService.shutdown();
> + if (! executorService.awaitTermination(WAIT_TIME,
> TimeUnit.SECONDS)) {
> + for (LiveNodeTrasher thrasher : thrashers) {
> + thrasher.stop();
> + }
> + }
> + assertTrue("iter"+iter+": thrashers didn't finish even after
> explicitly
> stopping",
> + executorService.awaitTermination(WAIT_TIME,
> TimeUnit.SECONDS));
> +
> + // sanity check the *real* live_nodes entries from ZK match what the
> thrashers added
> + int totalAdded = 1; // 1 real live node when we started
> + for (LiveNodeTrasher thrasher : thrashers) {
> + totalAdded += thrasher.getNumAdded();
> + }
> + actualLiveNodes = getTrueLiveNodesFromZk();
> + assertEquals("iter"+iter, totalAdded, actualLiveNodes.size());
> +
> + // verify our local client knows the correct set of live nodes
> + cachedLiveNodes =
> getCachedLiveNodesFromLocalState(actualLiveNodes.size());
> + assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " +
> cachedLiveNodes.size(),
> + actualLiveNodes, cachedLiveNodes);
> +
> + } finally {
> + for (LiveNodeTrasher thrasher : thrashers) {
> + // shutdown our zk connection, freeing our ephemeral nodes
> + thrasher.close();
> + }
> + }
> + }
> + }
> +
> + /** NOTE: has internal counter which is not thread safe, only call() in one
> thread at a time */
> + public static final class LiveNodeTrasher implements Callable<Integer> {
> + private final String id;
> + private final int numNodesToAdd;
> + private final SolrZkClient client;
> +
> + private boolean running = false;;
> + private int numAdded = 0;
> +
> + /** ID should ideally be unique amonst any other instances */
> + public LiveNodeTrasher(String id, int numNodesToAdd) {
> + this.id = id;
> + this.numNodesToAdd = numNodesToAdd;
> + this.client = newSolrZkClient();
> + }
> + /** returns the number of nodes actually added w/o error */
> + public Integer call() {
> + running = true;
> + // NOTE: test includes 'running'
> + for (int i = 0; running && i < numNodesToAdd; i++) {
> + final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE +
> "/thrasher-" + id + "-" + i;
> + try {
> + client.makePath(nodePath, CreateMode.EPHEMERAL, true);
> + numAdded++;
> + } catch (Exception e) {
> + log.error("failed to create: " + nodePath, e);
> + }
> + }
> + return numAdded;
> + }
> + public int getNumAdded() {
> + return numAdded;
> + }
> + public void close() {
> + client.close();
> + }
> + public void stop() {
> + running = false;
> + }
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/lucene-
> solr/blob/744b419b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkSt
> ateReader.java
> ----------------------------------------------------------------------
> diff --git
> a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
> b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
> index 308b3e0..e8d95c3 100644
> --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
> +++
> b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
> @@ -32,6 +32,7 @@ import java.util.Set;
> import java.util.TreeSet;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.atomic.AtomicReference;
>
> import org.apache.solr.common.Callable;
> import org.apache.solr.common.SolrException;
> @@ -487,6 +488,10 @@ public class ZkStateReader implements Closeable {
> final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat,
> true);
> final ClusterState loadedData = ClusterState.load(stat.getVersion(),
> data,
> emptySet(), CLUSTER_STATE);
> synchronized (getUpdateLock()) {
> + if (this.legacyClusterStateVersion >= stat.getVersion()) {
> + // Nothing to do, someone else updated same or newer.
> + return;
> + }
> this.legacyCollectionStates = loadedData.getCollectionStates();
> this.legacyClusterStateVersion = stat.getVersion();
> }
> @@ -509,6 +514,9 @@ public class ZkStateReader implements Closeable {
> }
> }
>
> + // We don't get a Stat or track versions on getChildren() calls, so force
> linearization.
> + private final Object refreshCollectionListLock = new Object();
> +
> /**
> * Search for any lazy-loadable state format2 collections.
> *
> @@ -522,29 +530,32 @@ public class ZkStateReader implements Closeable {
> * {@link ClusterState#getCollections()} method as a safeguard against
> exposing wrong collection names to the users
> */
> private void refreshCollectionList(Watcher watcher) throws
> KeeperException, InterruptedException {
> - List<String> children = null;
> - try {
> - children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
> - } catch (KeeperException.NoNodeException e) {
> - LOG.warn("Error fetching collection names: [{}]", e.getMessage());
> - // fall through
> - }
> - if (children == null || children.isEmpty()) {
> - lazyCollectionStates.clear();
> - return;
> - }
> -
> - // Don't mess with watchedCollections, they should self-manage.
> + synchronized (refreshCollectionListLock) {
> + List<String> children = null;
> + try {
> + children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
> + } catch (KeeperException.NoNodeException e) {
> + LOG.warn("Error fetching collection names: [{}]", e.getMessage());
> + // fall through
> + }
> + if (children == null || children.isEmpty()) {
> + lazyCollectionStates.clear();
> + return;
> + }
>
> - // First, drop any children that disappeared.
> - this.lazyCollectionStates.keySet().retainAll(children);
> - for (String coll : children) {
> - // We will create an eager collection for any interesting collections,
> so
> don't add to lazy.
> - if (!interestingCollections.contains(coll)) {
> - // Double check contains just to avoid allocating an object.
> - LazyCollectionRef existing = lazyCollectionStates.get(coll);
> - if (existing == null) {
> - lazyCollectionStates.putIfAbsent(coll, new
> LazyCollectionRef(coll));
> + // Don't lock getUpdateLock() here, we don't need it and it would cause
> deadlock.
> + // Don't mess with watchedCollections, they should self-manage.
> +
> + // First, drop any children that disappeared.
> + this.lazyCollectionStates.keySet().retainAll(children);
> + for (String coll : children) {
> + // We will create an eager collection for any interesting
> collections, so
> don't add to lazy.
> + if (!interestingCollections.contains(coll)) {
> + // Double check contains just to avoid allocating an object.
> + LazyCollectionRef existing = lazyCollectionStates.get(coll);
> + if (existing == null) {
> + lazyCollectionStates.putIfAbsent(coll, new
> LazyCollectionRef(coll));
> + }
> }
> }
> }
> @@ -576,19 +587,35 @@ public class ZkStateReader implements Closeable {
> }
> }
>
> + // We don't get a Stat or track versions on getChildren() calls, so force
> linearization.
> + private final Object refreshLiveNodesLock = new Object();
> + // Ensures that only the latest getChildren fetch gets applied.
> + private final AtomicReference<Set<String>> lastFetchedLiveNodes = new
> AtomicReference<>();
> +
> /**
> * Refresh live_nodes.
> */
> private void refreshLiveNodes(Watcher watcher) throws KeeperException,
> InterruptedException {
> - Set<String> newLiveNodes;
> - try {
> - List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE,
> watcher, true);
> - newLiveNodes = new HashSet<>(nodeList);
> - } catch (KeeperException.NoNodeException e) {
> - newLiveNodes = emptySet();
> + synchronized (refreshLiveNodesLock) {
> + Set<String> newLiveNodes;
> + try {
> + List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE,
> watcher, true);
> + newLiveNodes = new HashSet<>(nodeList);
> + } catch (KeeperException.NoNodeException e) {
> + newLiveNodes = emptySet();
> + }
> + lastFetchedLiveNodes.set(newLiveNodes);
> }
> - Set<String> oldLiveNodes;
> +
> + // Can't lock getUpdateLock() until we release the other, it would cause
> deadlock.
> + Set<String> oldLiveNodes, newLiveNodes;
> synchronized (getUpdateLock()) {
> + newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
> + if (newLiveNodes == null) {
> + // Someone else won the race to apply the last update, just exit.
> + return;
> + }
> +
> oldLiveNodes = this.liveNodes;
> this.liveNodes = newLiveNodes;
> if (clusterState != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]