This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 376970106a Fix bugs in DynamicBrokerSelector (#13816)
376970106a is described below
commit 376970106a5177ab6f5065de4a102778b3e06db9
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Sat Aug 17 12:31:12 2024 +0530
Fix bugs in DynamicBrokerSelector (#13816)
---
.../java/org/apache/pinot/client/DynamicBrokerSelector.java | 13 +++++--------
.../org/apache/pinot/client/utils/BrokerSelectorUtils.java | 4 +++-
.../java/org/apache/pinot/client/ConnectionFactoryTest.java | 9 ++++++++-
.../org/apache/pinot/client/DynamicBrokerSelectorTest.java | 2 +-
4 files changed, 17 insertions(+), 11 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index b349423d83..346e3867c3 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -19,7 +19,6 @@
package org.apache.pinot.client;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -45,11 +44,10 @@ public class DynamicBrokerSelector implements
BrokerSelector, IZkDataListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(DynamicBrokerSelector.class);
private static final Random RANDOM = new Random();
- private final AtomicReference<Map<String, List<String>>>
_tableToBrokerListMapRef = new AtomicReference<>();
- private final AtomicReference<List<String>> _allBrokerListRef = new
AtomicReference<>();
- private final ZkClient _zkClient;
- private final ExternalViewReader _evReader;
- private final List<String> _brokerList;
+ protected final AtomicReference<Map<String, List<String>>>
_tableToBrokerListMapRef = new AtomicReference<>();
+ protected final AtomicReference<List<String>> _allBrokerListRef = new
AtomicReference<>();
+ protected final ZkClient _zkClient;
+ protected final ExternalViewReader _evReader;
//The preferTlsPort will be mapped to client config in the future, when we
support full TLS
public DynamicBrokerSelector(String zkServers, boolean preferTlsPort) {
_zkClient = getZkClient(zkServers);
@@ -57,7 +55,6 @@ public class DynamicBrokerSelector implements BrokerSelector,
IZkDataListener {
_zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
_zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH,
this);
_evReader = getEvReader(_zkClient, preferTlsPort);
- _brokerList = ImmutableList.of(zkServers);
refresh();
}
public DynamicBrokerSelector(String zkServers) {
@@ -112,7 +109,7 @@ public class DynamicBrokerSelector implements
BrokerSelector, IZkDataListener {
@Override
public List<String> getBrokers() {
- return _brokerList;
+ return _allBrokerListRef.get();
}
@Override
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
index 1ca15255cd..e3a1df44db 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
@@ -57,7 +57,9 @@ public class BrokerSelectorUtils {
return null;
}
- List<String> commonBrokers = tablesBrokersList.get(0);
+ // Make a copy of the brokersList of the first table. retainAll does
inplace modifications.
+ // So lists from brokerData should not be used directly.
+ List<String> commonBrokers = new ArrayList<>(tablesBrokersList.get(0));
for (int i = 1; i < tablesBrokersList.size(); i++) {
commonBrokers.retainAll(tablesBrokersList.get(i));
}
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 5a755afc55..e12dc873f6 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -37,6 +37,8 @@ public class ConnectionFactoryTest {
public void testZkConnection() {
// Create a dummy Helix structure
final String givenZkServers = "127.0.0.1:1234";
+ final String givenBrokerInfo = "localhost:2345";
+
DynamicBrokerSelector dynamicBrokerSelector = Mockito.spy(new
DynamicBrokerSelector(givenZkServers) {
@Override
protected ZkClient getZkClient(String zkServers) {
@@ -47,6 +49,11 @@ public class ConnectionFactoryTest {
protected ExternalViewReader getEvReader(ZkClient zkClient) {
return Mockito.mock(ExternalViewReader.class);
}
+
+ @Override
+ public List<String> getBrokers() {
+ return ImmutableList.of(givenBrokerInfo);
+ }
});
PinotClientTransport pinotClientTransport =
Mockito.mock(PinotClientTransport.class);
@@ -57,7 +64,7 @@ public class ConnectionFactoryTest {
pinotClientTransport);
// Check that the broker list has the right length and has the same servers
- Assert.assertEquals(connection.getBrokerList(),
ImmutableList.of(givenZkServers));
+ Assert.assertEquals(connection.getBrokerList(),
ImmutableList.of(givenBrokerInfo));
}
@Test
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
index dd3ee3bbbf..d52438ab54 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
@@ -143,7 +143,7 @@ public class DynamicBrokerSelectorTest {
@Test
public void testGetBrokers() {
- assertEquals(_dynamicBrokerSelectorUnderTest.getBrokers(),
ImmutableList.of(ZK_SERVER));
+ assertEquals(_dynamicBrokerSelectorUnderTest.getBrokers(),
ImmutableList.of("broker1"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]