This is an automated email from the ASF dual-hosted git repository. noble pushed a commit to branch jira/SOLR-16580_9_1 in repository https://gitbox.apache.org/repos/asf/solr.git
commit ce9101db9ff032ccb724edfd68f490badb61158e Author: Noble Paul <[email protected]> AuthorDate: Tue Jan 3 21:51:51 2023 +1100 All changes from github.com/apache/solr/pull/1242 --- .../solr/cloud/DistributedClusterStateUpdater.java | 5 +- .../solr/cloud/overseer/ClusterStateMutator.java | 4 +- .../solr/cloud/overseer/CollectionMutator.java | 7 +- .../apache/solr/cloud/overseer/ZkStateWriter.java | 15 +++- .../org/apache/solr/core/backup/BackupManager.java | 2 +- .../apache/solr/handler/admin/ClusterStatus.java | 5 ++ .../solr/handler/admin/CollectionsHandler.java | 6 +- .../org/apache/solr/cloud/ClusterStateTest.java | 10 +-- .../OverseerCollectionConfigSetProcessorTest.java | 8 +- .../test/org/apache/solr/cloud/SliceStateTest.java | 2 +- .../solr/cloud/overseer/ZkStateReaderTest.java | 70 ++++++++++++---- .../solr/cloud/overseer/ZkStateWriterTest.java | 11 ++- .../client/solrj/cloud/DistribStateManager.java | 12 +++ .../solrj/impl/ZkClientClusterStateProvider.java | 8 +- .../solr/common/cloud/PerReplicaStatesFetcher.java | 10 +++ .../apache/solr/common/cloud/ZkStateReader.java | 17 ---- .../solrj/impl/BaseHttpClusterStateProvider.java | 27 +++++- .../org/apache/solr/common/cloud/ClusterState.java | 92 +++++--------------- .../apache/solr/common/cloud/DocCollection.java | 97 ++++++++++++++-------- .../apache/solr/common/cloud/PerReplicaStates.java | 20 +++++ .../java/org/apache/solr/common/cloud/Replica.java | 49 ++++++----- .../java/org/apache/solr/common/cloud/Slice.java | 26 +++++- .../solrj/impl/CloudSolrClientCacheTest.java | 2 +- .../cloud/PerReplicaStatesIntegrationTest.java | 8 +- 24 files changed, 326 insertions(+), 187 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java index ca745b55ff6..2ee7cba3239 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java @@ -628,13 +628,16 @@ public class DistributedClusterStateUpdater { // This factory method can detect a missing configName and supply it by reading it from the // old ZK location. // TODO in Solr 10 remove that factory method - ClusterState clusterState = + + ClusterState clusterState; + clusterState = ZkClientClusterStateProvider.createFromJsonSupportingLegacyConfigName( stat.getVersion(), data, Collections.emptySet(), updater.getCollectionName(), zkStateReader.getZkClient()); + return clusterState; } } diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java index f979d47a9af..f95959e33e5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java @@ -125,7 +125,9 @@ public class ClusterStateMutator { } assert !collectionProps.containsKey(CollectionAdminParams.COLL_CONF); - DocCollection newCollection = new DocCollection(cName, slices, collectionProps, router, -1); + DocCollection newCollection = + new DocCollection( + cName, slices, collectionProps, router, -1, stateManager.getPrsSupplier(cName)); return new ZkWriteCommand(cName, newCollection); } diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java index 6269e7ba329..492fe9efeb5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java @@ -172,7 +172,12 @@ public class CollectionMutator { DocCollection collection = new DocCollection( - coll.getName(), coll.getSlicesMap(), props, coll.getRouter(), coll.getZNodeVersion()); + coll.getName(), + coll.getSlicesMap(), + props, + coll.getRouter(), + coll.getZNodeVersion(), + stateManager.getPrsSupplier(coll.getName())); if (replicaOps == null) { return new ZkWriteCommand(coll.getName(), collection); } else { diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java index 8fb0ce287fc..6aa561632cd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java @@ -280,13 +280,24 @@ public class ZkStateWriter { Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true); DocCollection newCollection = new DocCollection( - name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion()); + name, + c.getSlicesMap(), + c.getProperties(), + c.getRouter(), + stat.getVersion(), + new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path)); clusterState = clusterState.copyWith(name, newCollection); } else { log.debug("going to create_collection {}", path); reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true); DocCollection newCollection = - new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0); + new DocCollection( + name, + c.getSlicesMap(), + c.getProperties(), + c.getRouter(), + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path)); clusterState = clusterState.copyWith(name, newCollection); } } diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java index 647886bca6a..660ea80cbaa 100644 --- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java +++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java @@ -219,7 +219,7 @@ public class BackupManager { repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) { byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small. is.readBytes(arr, 0, (int) is.length()); - ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet()); + ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet(), null); return c_state.getCollection(collectionName); } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java index c1a7c31a02d..9130fcc317a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java @@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.PerReplicaStates; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; @@ -188,6 +189,10 @@ public class ClusterStatus { } String configName = clusterStateCollection.getConfigName(); collectionStatus.put("configName", configName); + if (message.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) { + PerReplicaStates prs = clusterStateCollection.getPerReplicaStates(); + collectionStatus.put("PRS", prs); + } collectionProps.add(name, collectionStatus); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 801b56d743c..cdaf70fb453 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -1237,7 +1237,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission CLUSTERSTATUS, (req, rsp, h) -> { Map<String, Object> all = - copy(req.getParams(), null, COLLECTION_PROP, SHARD_ID_PROP, _ROUTE_); + copy(req.getParams(), null, COLLECTION_PROP, SHARD_ID_PROP, _ROUTE_, "prs"); new ClusterStatus( h.coreContainer.getZkController().getZkStateReader(), new ZkNodeProps(all)) .getClusterStatus(rsp.getValues()); @@ -2010,7 +2010,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission replicas.add(shard.getLeader()); } for (Replica replica : replicas) { - String state = replica.getStr(ZkStateReader.STATE_PROP); + State state = replica.getState(); if (log.isDebugEnabled()) { log.debug( "Checking replica status, collection={} replica={} state={}", @@ -2019,7 +2019,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission state); } if (!n.contains(replica.getNodeName()) - || !state.equals(Replica.State.ACTIVE.toString())) { + || !state.equals(Replica.State.ACTIVE)) { replicaNotAliveCnt++; return false; } diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java index 1af173271e0..7acb960828a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java @@ -59,14 +59,14 @@ public class ClusterStateTest extends SolrTestCaseJ4 { Slice slice2 = new Slice("shard2", sliceToProps, null, "collection1"); slices.put("shard2", slice2); collectionStates.put( - "collection1", new DocCollection("collection1", slices, props, DocRouter.DEFAULT)); + "collection1", new DocCollection("collection1", slices, props, DocRouter.DEFAULT, 0, null)); collectionStates.put( - "collection2", new DocCollection("collection2", slices, props, DocRouter.DEFAULT)); + "collection2", new DocCollection("collection2", slices, props, DocRouter.DEFAULT, 0, null)); ClusterState clusterState = new ClusterState(liveNodes, collectionStates); byte[] bytes = Utils.toJSON(clusterState); // System.out.println("#################### " + new String(bytes)); - ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes); + ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null); assertEquals( "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size()); @@ -90,13 +90,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 { .get("node1") .getStr("prop2")); - loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes); + loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes, null); assertEquals( "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size()); assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size()); - loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes); + loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes, null); assertEquals( "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size()); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java index f90e3805405..c2fcc138c2c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java @@ -661,6 +661,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 { handleCreateCollMessageProps(ZkNodeProps.load(bytes)); } + @SuppressWarnings("DirectInvocationOnMock") private void handleCreateCollMessageProps(ZkNodeProps props) { log.info("track created replicas / collections"); try { @@ -676,7 +677,12 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 { collName, new ClusterState.CollectionRef( new DocCollection( - collName, new HashMap<>(), props.getProperties(), DocRouter.DEFAULT))); + collName, + new HashMap<>(), + props.getProperties(), + DocRouter.DEFAULT, + 0, + distribStateManagerMock.getPrsSupplier(collName)))); } if (CollectionParams.CollectionAction.ADDREPLICA.isEqual(props.getStr("operation"))) { replicas.add(props); diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java index f12b9597ccb..4517cc1bc15 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java @@ -59,7 +59,7 @@ public class SliceStateTest extends SolrTestCaseJ4 { ClusterState clusterState = new ClusterState(liveNodes, collectionStates); byte[] bytes = Utils.toJSON(clusterState); - ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes); + ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null); assertSame( "Default state not set to active", diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index 73093e41f3f..a839f3e6854 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -51,12 +51,15 @@ import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.TimeSource; import org.apache.solr.handler.admin.ConfigSetsHandler; +import org.apache.solr.util.LogLevel; import org.apache.solr.util.TimeOut; import org.junit.After; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@LogLevel( + "org.apache.solr.common.cloud.ZkStateReader=DEBUG;org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG") public class ZkStateReaderTest extends SolrTestCaseJ4 { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final long TIMEOUT = 30; @@ -132,7 +135,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0)); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1")))); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null); writer.writePendingUpdates(); @@ -157,7 +162,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); ZkWriteCommand wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); writer.writePendingUpdates(); @@ -168,7 +175,15 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { Map<String, Object> props = new HashMap<>(); props.put("x", "y"); props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME); - state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0); + state = + new DocCollection( + "c1", + new HashMap<>(), + props, + DocRouter.DEFAULT, + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); writer.writePendingUpdates(); @@ -207,7 +222,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); ZkWriteCommand wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); writer.writePendingUpdates(); @@ -239,7 +256,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); ZkWriteCommand wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); clusterState = writer.writePendingUpdates(); @@ -256,7 +275,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); assertFalse(ref.isLazilyLoaded()); assertEquals(0, ref.get().getZNodeVersion()); - assertEquals(-1, ref.get().getChildNodesVersion()); + // dummy node created +1 and deleted +1 so 2 + assertEquals(2, ref.get().getChildNodesVersion()); DocCollection collection = ref.get(); PerReplicaStates prs = @@ -275,7 +295,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ref = reader.getClusterState().getCollectionRef("c1"); assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version - assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now + assertEquals(3, ref.get().getChildNodesVersion()); // but child version should be 1 now prs = ref.get().getPerReplicaStates(); PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs) @@ -289,7 +309,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ref = reader.getClusterState().getCollectionRef("c1"); assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version // but child version should be 3 now (1 del + 1 add) - assertEquals(3, ref.get().getChildNodesVersion()); + assertEquals(5, ref.get().getChildNodesVersion()); // now delete the collection wc = new ZkWriteCommand("c1", null); @@ -314,7 +334,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ref = reader.getClusterState().getCollectionRef("c1"); assertFalse(ref.isLazilyLoaded()); assertEquals(0, ref.get().getZNodeVersion()); - assertEquals(-1, ref.get().getChildNodesVersion()); // child node version is reset + assertEquals(2, ref.get().getChildNodesVersion()); // child node version is reset // re-add PRS collection = ref.get(); @@ -335,7 +355,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ref = reader.getClusterState().getCollectionRef("c1"); // child version should be reset since the state.json node was deleted and re-created - assertEquals(1, ref.get().getChildNodesVersion()); + assertEquals(3, ref.get().getChildNodesVersion()); } public void testForciblyRefreshAllClusterState() throws Exception { @@ -354,9 +374,15 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new DocCollection( "c1", new HashMap<>(), - Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + Map.of( + ZkStateReader.CONFIGNAME_PROP, + ConfigSetsHandler.DEFAULT_CONFIGSET_NAME, + DocCollection.CollectionStateProps.PER_REPLICA_STATE, + "true"), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); ZkWriteCommand wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); writer.writePendingUpdates(); @@ -376,7 +402,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - ref.get().getZNodeVersion()); + ref.get().getZNodeVersion(), + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); writer.writePendingUpdates(); @@ -397,7 +425,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c2"))); ZkWriteCommand wc2 = new ZkWriteCommand("c2", state); writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null); @@ -432,7 +462,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1); DocCollection state2 = new DocCollection( @@ -440,7 +472,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { new HashMap<>(), Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - 0); + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); // do not listen to c2 fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); @@ -493,7 +527,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), DocRouter.DEFAULT, - currentVersion); + currentVersion, + new PerReplicaStatesFetcher.LazyPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); ZkWriteCommand wc = new ZkWriteCommand("c1", state); writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); clusterState = writer.writePendingUpdates(); diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java index be38ec88fe6..27ba3dda4b8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java @@ -32,6 +32,7 @@ import org.apache.solr.cloud.ZkTestServer; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.PerReplicaStatesFetcher; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; @@ -161,7 +162,15 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 { prsProps.put("perReplicaState", Boolean.TRUE); ZkWriteCommand prs1 = new ZkWriteCommand( - "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0)); + "prs1", + new DocCollection( + "prs1", + new HashMap<>(), + prsProps, + DocRouter.DEFAULT, + 0, + new PerReplicaStatesFetcher.LazyPrsSupplier( + zkClient, DocCollection.getCollectionPath("c1")))); ZkStateWriter writer = new ZkStateWriter(reader, new Stats()); // First write is flushed immediately diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java index 09be10c73c9..7f2189a63f1 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import org.apache.solr.common.SolrCloseable; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.PerReplicaStates; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -123,6 +124,17 @@ public interface DistribStateManager extends SolrCloseable { throw new UnsupportedOperationException("Not implemented"); } + default DocCollection.PrsSupplier getPrsSupplier(String collName) { + return new DocCollection.PrsSupplier( + () -> { + try { + return getReplicaStates(DocCollection.getCollectionPath(collName)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + /** * Remove data recursively. * diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java index cfb0911a142..98c89b1ee0d 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java @@ -28,6 +28,7 @@ import org.apache.solr.common.AlreadyClosedException; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.PerReplicaStatesFetcher; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -113,7 +114,12 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider { } } } - return ClusterState.createFromCollectionMap(version, stateMap, liveNodes); + return ClusterState.createFromCollectionMap( + version, + stateMap, + liveNodes, + new PerReplicaStatesFetcher.LazyPrsSupplier( + zkClient, DocCollection.getCollectionPath(coll))); } @Override diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java index f959cab3e03..c8baa8b4f07 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java @@ -17,13 +17,17 @@ package org.apache.solr.common.cloud; +import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.List; import org.apache.solr.common.SolrException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PerReplicaStatesFetcher { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link * Stat#getCversion()} of state.json. If this is not modified, the same object is returned @@ -50,4 +54,10 @@ public class PerReplicaStatesFetcher { e); } } + + public static class LazyPrsSupplier extends DocCollection.PrsSupplier { + public LazyPrsSupplier(SolrZkClient zkClient, String collectionPath) { + super(() -> PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null)); + } + } } diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 26239784115..d5f69e8a219 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1684,21 +1684,6 @@ public class ZkStateReader implements SolrCloseable { throws KeeperException, InterruptedException { String collectionPath = DocCollection.getCollectionPath(coll); while (true) { - ClusterState.initReplicaStateProvider( - () -> { - try { - PerReplicaStates replicaStates = - PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null); - log.debug( - "per-replica-state ver: {} fetched for initializing {} ", - replicaStates.cversion, - collectionPath); - return replicaStates; - } catch (Exception e) { - // TODO - throw new RuntimeException(e); - } - }); try { Stat stat = new Stat(); byte[] data = zkClient.getData(collectionPath, watcher, stat, true); @@ -1723,8 +1708,6 @@ public class ZkStateReader implements SolrCloseable { } } return null; - } finally { - ClusterState.clearReplicaStateProvider(); } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java index 875f21a8ade..a2420ec37b2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java @@ -35,6 +35,8 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.PerReplicaStates; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; @@ -123,6 +125,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid params.set("collection", collection); } params.set("action", "CLUSTERSTATUS"); + params.set("prs", "true"); QueryRequest request = new QueryRequest(params); request.setPath("/admin/collections"); SimpleOrderedMap<?> cluster = (SimpleOrderedMap<?>) client.request(request).get("cluster"); @@ -147,7 +150,13 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid Set<String> liveNodes = new HashSet<>((List<String>) (cluster.get("live_nodes"))); this.liveNodes = liveNodes; liveNodesTimestamp = System.nanoTime(); - ClusterState cs = ClusterState.createFromCollectionMap(znodeVersion, collectionsMap, liveNodes); + ClusterState cs = new ClusterState(liveNodes, new HashMap<>()); + for (Map.Entry<String, Object> e : collectionsMap.entrySet()) { + @SuppressWarnings("rawtypes") + Map m = (Map) e.getValue(); + cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, m)); + } + if (clusterProperties != null) { Map<String, Object> properties = (Map<String, Object>) cluster.get("properties"); if (properties != null) { @@ -157,6 +166,22 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid return cs; } + @SuppressWarnings({"rawtypes", "unchecked"}) + private DocCollection fillPrs(int znodeVersion, Map.Entry<String, Object> e, Map m) { + DocCollection.PrsSupplier prsSupplier = null; + if (m.containsKey("PRS")) { + Map prs = (Map) m.remove("PRS"); + prsSupplier = + new DocCollection.PrsSupplier( + () -> + new PerReplicaStates( + (String) prs.get("path"), + (Integer) prs.get("cversion"), + (List<String>) prs.get("states"))); + } + return ClusterState.collectionFromObjects(e.getKey(), m, znodeVersion, prsSupplier); + } + @Override public Set<String> getLiveNodes() { if (liveNodes == null) { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index 3a96b8b8dcd..67adc9667dd 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -26,12 +26,10 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -227,33 +225,49 @@ public class ClusterState implements JSONWriter.Writable { * @param liveNodes list of live nodes * @return the ClusterState */ - public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) { + public static ClusterState createFromJson( + int version, byte[] bytes, Set<String> liveNodes, DocCollection.PrsSupplier prsSupplier) { if (bytes == null || bytes.length == 0) { return new ClusterState(liveNodes, Collections.<String, DocCollection>emptyMap()); } @SuppressWarnings({"unchecked"}) Map<String, Object> stateMap = (Map<String, Object>) Utils.fromJSON(bytes, 0, bytes.length, STR_INTERNER_OBJ_BUILDER); - return createFromCollectionMap(version, stateMap, liveNodes); + return createFromCollectionMap(version, stateMap, liveNodes, prsSupplier); + } + + @Deprecated + public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) { + return createFromJson(version, bytes, liveNodes, null); } public static ClusterState createFromCollectionMap( - int version, Map<String, Object> stateMap, Set<String> liveNodes) { + int version, + Map<String, Object> stateMap, + Set<String> liveNodes, + DocCollection.PrsSupplier prsSupplier) { Map<String, CollectionRef> collections = new LinkedHashMap<>(stateMap.size()); for (Entry<String, Object> entry : stateMap.entrySet()) { String collectionName = entry.getKey(); @SuppressWarnings({"unchecked"}) DocCollection coll = - collectionFromObjects(collectionName, (Map<String, Object>) entry.getValue(), version); + collectionFromObjects( + collectionName, (Map<String, Object>) entry.getValue(), version, prsSupplier); collections.put(collectionName, new CollectionRef(coll)); } return new ClusterState(collections, liveNodes); } + @Deprecated + public static ClusterState createFromCollectionMap( + int version, Map<String, Object> stateMap, Set<String> liveNodes) { + return createFromCollectionMap(version, stateMap, liveNodes, null); + } + // TODO move to static DocCollection.loadFromMap - private static DocCollection collectionFromObjects( - String name, Map<String, Object> objs, int version) { + public static DocCollection collectionFromObjects( + String name, Map<String, Object> objs, int version, DocCollection.PrsSupplier prsSupplier) { Map<String, Object> props; Map<String, Slice> slices; @@ -261,9 +275,6 @@ public class ClusterState implements JSONWriter.Writable { if (log.isDebugEnabled()) { log.debug("a collection {} has per-replica state", name); } - // this collection has replica states stored outside - ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get(); - if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true; } @SuppressWarnings({"unchecked"}) Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(CollectionStateProps.SHARDS); @@ -291,7 +302,7 @@ public class ClusterState implements JSONWriter.Writable { router = DocRouter.getDocRouter((String) routerProps.get("name")); } - return new DocCollection(name, slices, props, router, version); + return new DocCollection(name, slices, props, router, version, prsSupplier); } @Override @@ -427,63 +438,6 @@ public class ClusterState implements JSONWriter.Writable { return collectionStates.size(); } - interface ReplicaStatesProvider { - - Optional<ReplicaStatesProvider> get(); - - PerReplicaStates getStates(); - } - - private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = - new ReplicaStatesProvider() { - @Override - public Optional<ReplicaStatesProvider> get() { - return Optional.empty(); - } - - @Override - public PerReplicaStates getStates() { - throw new RuntimeException("Invalid operation"); - } - }; - - private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>(); - - public static ReplicaStatesProvider getReplicaStatesProvider() { - return (REPLICASTATES_PROVIDER.get() == null) - ? EMPTYSTATEPROVIDER - : REPLICASTATES_PROVIDER.get(); - } - - public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) { - REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier)); - } - - public static void clearReplicaStateProvider() { - REPLICASTATES_PROVIDER.remove(); - } - - private static class StatesProvider implements ReplicaStatesProvider { - private final Supplier<PerReplicaStates> replicaStatesSupplier; - private PerReplicaStates perReplicaStates; - private boolean isPerReplicaState = false; - - public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) { - this.replicaStatesSupplier = replicaStatesSupplier; - } - - @Override - public Optional<ReplicaStatesProvider> get() { - return isPerReplicaState ? Optional.of(this) : Optional.empty(); - } - - @Override - public PerReplicaStates getStates() { - if (perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get(); - return perReplicaStates; - } - } - private static volatile Function<JSONParser, ObjectBuilder> STR_INTERNER_OBJ_BUILDER = STANDARDOBJBUILDER; diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 6d4f6394d45..4de7503b403 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -31,6 +31,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.BiPredicate; +import java.util.function.Supplier; import org.apache.solr.common.cloud.Replica.ReplicaStateProps; import org.noggit.JSONWriter; import org.slf4j.Logger; @@ -63,13 +64,23 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { private final Boolean readOnly; private final Boolean perReplicaState; private final Map<String, Replica> replicaMap = new HashMap<>(); - private volatile PerReplicaStates perReplicaStates; + private PrsSupplier prsSupplier; + @Deprecated public DocCollection( String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) { - this(name, slices, props, router, Integer.MAX_VALUE); + this(name, slices, props, router, Integer.MAX_VALUE, null); } + @Deprecated + public DocCollection( + String name, + Map<String, Slice> slices, + Map<String, Object> props, + DocRouter router, + int zkVersion) { + this(name, slices, props, router, zkVersion, null); + } /** * @param name The name of the collection * @param slices The logical shards of the collection. This is used directly and a copy is not @@ -83,7 +94,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { Map<String, Slice> slices, Map<String, Object> props, DocRouter router, - int zkVersion) { + int zkVersion, + PrsSupplier prsSupplier) { super(props); // -1 means any version in ZK CAS, so we choose Integer.MAX_VALUE instead to avoid accidental // overwrites @@ -100,9 +112,17 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { this.numPullReplicas = (Integer) verifyProp(props, CollectionStateProps.PULL_REPLICAS, 0); this.perReplicaState = (Boolean) verifyProp(props, CollectionStateProps.PER_REPLICA_STATE, Boolean.FALSE); - ClusterState.getReplicaStatesProvider() - .get() - .ifPresent(it -> perReplicaStates = it.getStates()); + if (this.perReplicaState) { + if (prsSupplier == null) { + throw new RuntimeException( + CollectionStateProps.PER_REPLICA_STATE + + " = true , but per-replica state supplier is not provided"); + } + this.prsSupplier = prsSupplier; + for (Slice s : this.slices.values()) { + s.setPrsSupplier(prsSupplier); + } + } Boolean readOnly = (Boolean) verifyProp(props, CollectionStateProps.READ_ONLY); this.readOnly = readOnly == null ? Boolean.FALSE : readOnly; @@ -139,30 +159,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { * only a replica is updated */ public DocCollection copyWith(PerReplicaStates newPerReplicaStates) { - if (log.isDebugEnabled()) { - log.debug( - "collection :{} going to be updated : per-replica state :{} -> {}", - name, - getChildNodesVersion(), - newPerReplicaStates.cversion); + if (this.prsSupplier != null) { + log.info("In-place update of PRS: {}", newPerReplicaStates); + this.prsSupplier.prs = newPerReplicaStates; } - if (getChildNodesVersion() >= newPerReplicaStates.cversion) return this; - Set<String> modifiedReplicas = - PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates); - if (modifiedReplicas.isEmpty()) return this; // nothing is modified - Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap()); - for (String s : modifiedReplicas) { - Replica replica = getReplica(s); - if (replica != null) { - Replica newReplica = replica.copyWith(newPerReplicaStates.get(s)); - Slice shard = modifiedShards.get(replica.shard); - modifiedShards.put(replica.shard, shard.copyWith(newReplica)); - } - } - DocCollection result = - new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion); - result.perReplicaStates = newPerReplicaStates; - return result; + return this; } private void addNodeNameReplica(Replica replica) { @@ -213,8 +214,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { * @return the resulting DocCollection */ public DocCollection copyWithSlices(Map<String, Slice> slices) { - DocCollection result = new DocCollection(getName(), slices, propMap, router, znodeVersion); - result.perReplicaStates = perReplicaStates; + DocCollection result = + new DocCollection(getName(), slices, propMap, router, znodeVersion, prsSupplier); return result; } /** Return collection name. */ @@ -281,7 +282,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { } public int getChildNodesVersion() { - return perReplicaStates == null ? -1 : perReplicaStates.cversion; + PerReplicaStates prs = prsSupplier == null ? null : prsSupplier.get(); + return prs == null ? -1 : prs.cversion; } public boolean isModified(int dataVersion, int childVersion) { @@ -318,7 +320,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { + "/" + znodeVersion + " " - + (perReplicaStates == null ? "" : perReplicaStates.toString()) + + (prsSupplier == null ? "" : prsSupplier.get()) + ")=" + toJSONString(this); } @@ -465,7 +467,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { } public PerReplicaStates getPerReplicaStates() { - return perReplicaStates; + return prsSupplier != null ? prsSupplier.get() : null; + } + + public PrsSupplier getPrsSupplier() { + return prsSupplier; } public int getExpectedReplicaCount(Replica.Type type, int def) { @@ -488,4 +494,27 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> { String SHARDS = "shards"; String PER_REPLICA_STATE = "perReplicaState"; } + + public static class PrsSupplier implements Supplier<PerReplicaStates> { + + private volatile PerReplicaStates prs; + + private Supplier<PerReplicaStates> supplier; + + public PrsSupplier(Supplier<PerReplicaStates> supplier) { + this.supplier = supplier; + } + + public PrsSupplier(PerReplicaStates prs) { + this.prs = prs; + } + + @Override + public PerReplicaStates get() { + if (prs == null) { + prs = supplier.get(); + } + return prs; + } + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java index f756ea1883f..698da842845 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java @@ -32,6 +32,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; import org.apache.solr.cluster.api.SimpleMap; +import org.apache.solr.common.IteratorWriter; import org.apache.solr.common.MapWriter; import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.cloud.Replica.ReplicaStateProps; @@ -302,4 +303,23 @@ public class PerReplicaStates implements ReflectMapWriter { return duplicate; } } + + @Override + public void writeMap(EntryWriter ew) throws IOException { + ReflectMapWriter.super.writeMap( + new EntryWriter() { + @Override + public EntryWriter put(CharSequence k, Object v) throws IOException { + if ("states".equals(k.toString())) { + ew.put( + "states", + (IteratorWriter) + iw -> states.forEachEntry((s, state) -> iw.addNoEx(state.toString()))); + } else { + ew.put(k, v); + } + return this; + } + }); + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index 308d37faeca..dc90e6b975a 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -137,11 +137,15 @@ public class Replica extends ZkNodeProps implements MapWriter { public final String core; public final Type type; public final String shard, collection; - private PerReplicaStates.State replicaState; + private DocCollection.PrsSupplier prsSupplier; // mutable private State state; + void setPrsSupplier(DocCollection.PrsSupplier prsSupplier) { + this.prsSupplier = prsSupplier; + } + public Replica(String name, Map<String, Object> map, String collection, String shard) { super(new HashMap<>()); propMap.putAll(map); @@ -151,7 +155,6 @@ public class Replica extends ZkNodeProps implements MapWriter { this.node = (String) propMap.get(ReplicaStateProps.NODE_NAME); this.core = (String) propMap.get(ReplicaStateProps.CORE_NAME); this.type = Type.get((String) propMap.get(ReplicaStateProps.TYPE)); - readPrs(); // default to ACTIVE this.state = State.getState( @@ -180,7 +183,6 @@ public class Replica extends ZkNodeProps implements MapWriter { if (props != null) { this.propMap.putAll(props); } - readPrs(); validate(); } @@ -202,7 +204,6 @@ public class Replica extends ZkNodeProps implements MapWriter { this.node = String.valueOf(details.get("node_name")); this.propMap.putAll(details); - readPrs(); type = Replica.Type.valueOf(String.valueOf(propMap.getOrDefault(ReplicaStateProps.TYPE, "NRT"))); if (state == null) @@ -211,22 +212,6 @@ public class Replica extends ZkNodeProps implements MapWriter { validate(); } - private void readPrs() { - ClusterState.getReplicaStatesProvider() - .get() - .ifPresent( - it -> { - log.debug("A replica {} state fetched from per-replica state", name); - replicaState = it.getStates().get(name); - if (replicaState != null) { - propMap.put( - ReplicaStateProps.STATE, - replicaState.state.toString().toLowerCase(Locale.ROOT)); - if (replicaState.isLeader) propMap.put(ReplicaStateProps.LEADER, "true"); - } - }); - } - private final void validate() { Objects.requireNonNull(this.name, "'name' must not be null"); Objects.requireNonNull(this.core, "'core' must not be null"); @@ -303,6 +288,14 @@ public class Replica extends ZkNodeProps implements MapWriter { /** Returns the {@link State} of this replica. */ public State getState() { + if (prsSupplier != null) { + PerReplicaStates.State s = prsSupplier.get().get(name); + if (s != null) { + return s.state; + } else { + return State.DOWN; + } + } return state; } @@ -312,7 +305,7 @@ public class Replica extends ZkNodeProps implements MapWriter { } public boolean isActive(Set<String> liveNodes) { - return this.node != null && liveNodes.contains(this.node) && this.state == State.ACTIVE; + return this.node != null && liveNodes.contains(this.node) && getState() == State.ACTIVE; } public Type getType() { @@ -320,6 +313,10 @@ public class Replica extends ZkNodeProps implements MapWriter { } public boolean isLeader() { + if (prsSupplier != null) { + PerReplicaStates.State st = prsSupplier.get().get(name); + return st == null ? false : st.isLeader; + } return getBool(ReplicaStateProps.LEADER, false); } @@ -354,16 +351,18 @@ public class Replica extends ZkNodeProps implements MapWriter { if (state.isLeader) props.put(ReplicaStateProps.LEADER, "true"); } Replica r = new Replica(name, props, collection, shard); - r.replicaState = state; return r; } public PerReplicaStates.State getReplicaState() { - return replicaState; + if (prsSupplier != null) { + return prsSupplier.get().get(name); + } + return null; } public Object clone() { - return new Replica(name, node, collection, shard, core, state, type, propMap); + return new Replica(name, node, collection, shard, core, getState(), type, propMap); } @Override @@ -401,7 +400,7 @@ public class Replica extends ZkNodeProps implements MapWriter { .put(ReplicaStateProps.COLLECTION, collection, p) .put(ReplicaStateProps.NODE_NAME, node, p) .put(ReplicaStateProps.TYPE, type.toString(), p) - .put(ReplicaStateProps.STATE, state.toString(), p); + .put(ReplicaStateProps.STATE, shard, p); }; } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java index 5414942b480..76539e70ead 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java @@ -45,6 +45,18 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> { public final String collection; + private DocCollection.PrsSupplier prsSupplier; + + void setPrsSupplier(DocCollection.PrsSupplier prsSupplier) { + this.prsSupplier = prsSupplier; + for (Replica r : replicas.values()) { + r.setPrsSupplier(prsSupplier); + } + if (leader == null) { + leader = findLeader(); + } + } + /** * Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */ @@ -132,7 +144,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> { // FUTURE: optional per-slice override of the collection replicationFactor private final Integer replicationFactor; private final Map<String, Replica> replicas; - private final Replica leader; + private Replica leader; private final State state; private final String parent; private final Map<String, RoutingRule> routingRules; @@ -204,8 +216,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> { } else { this.routingRules = null; } - - leader = findLeader(); } @SuppressWarnings({"unchecked"}) @@ -275,7 +285,15 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> { } public Replica getLeader() { - return leader; + if (prsSupplier != null) { + // this is a PRS collection. leader may keep changing + return findLeader(); + } else { + if (leader == null) { + leader = findLeader(); + } + return leader; + } } public int getNumLeaderReplicas() { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java index 5be6a44e76d..44053fbeb57 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java @@ -87,7 +87,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 { .build()) { livenodes.addAll(ImmutableSet.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr")); ClusterState cs = - ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet()); + ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet(), null); refs.put(collName, new Ref(collName)); colls.put(collName, cs.getCollectionOrNull(collName)); responses.put( diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java index f5081ae0820..66b8cf121df 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java @@ -39,7 +39,13 @@ import org.slf4j.LoggerFactory; /** This test would be faster if we simulated the zk state instead. */ @LogLevel( - "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO") + "org.apache.solr.common.cloud.ZkStateReader=DEBUG;" + + "org.apache.solr.handler.admin.CollectionsHandler=DEBUG;" + + "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;" + + "org.apache.solr.cloud.Overseer=INFO;" + + "org.apache.solr.common.cloud=INFO;" + + "org.apache.solr.cloud.api.collections=INFO;" + + "org.apache.solr.cloud.overseer=INFO") public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
