This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 376970106a Fix bugs in DynamicBrokerSelector (#13816)
376970106a is described below

commit 376970106a5177ab6f5065de4a102778b3e06db9
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Sat Aug 17 12:31:12 2024 +0530

    Fix bugs in DynamicBrokerSelector (#13816)
---
 .../java/org/apache/pinot/client/DynamicBrokerSelector.java | 13 +++++--------
 .../org/apache/pinot/client/utils/BrokerSelectorUtils.java  |  4 +++-
 .../java/org/apache/pinot/client/ConnectionFactoryTest.java |  9 ++++++++-
 .../org/apache/pinot/client/DynamicBrokerSelectorTest.java  |  2 +-
 4 files changed, 17 insertions(+), 11 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index b349423d83..346e3867c3 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.client;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -45,11 +44,10 @@ public class DynamicBrokerSelector implements 
BrokerSelector, IZkDataListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DynamicBrokerSelector.class);
   private static final Random RANDOM = new Random();
 
-  private final AtomicReference<Map<String, List<String>>> 
_tableToBrokerListMapRef = new AtomicReference<>();
-  private final AtomicReference<List<String>> _allBrokerListRef = new 
AtomicReference<>();
-  private final ZkClient _zkClient;
-  private final ExternalViewReader _evReader;
-  private final List<String> _brokerList;
+  protected final AtomicReference<Map<String, List<String>>> 
_tableToBrokerListMapRef = new AtomicReference<>();
+  protected final AtomicReference<List<String>> _allBrokerListRef = new 
AtomicReference<>();
+  protected final ZkClient _zkClient;
+  protected final ExternalViewReader _evReader;
   //The preferTlsPort will be mapped to client config in the future, when we 
support full TLS
   public DynamicBrokerSelector(String zkServers, boolean preferTlsPort) {
     _zkClient = getZkClient(zkServers);
@@ -57,7 +55,6 @@ public class DynamicBrokerSelector implements BrokerSelector, 
IZkDataListener {
     _zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
     
_zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, 
this);
     _evReader = getEvReader(_zkClient, preferTlsPort);
-    _brokerList = ImmutableList.of(zkServers);
     refresh();
   }
   public DynamicBrokerSelector(String zkServers) {
@@ -112,7 +109,7 @@ public class DynamicBrokerSelector implements 
BrokerSelector, IZkDataListener {
 
   @Override
   public List<String> getBrokers() {
-    return _brokerList;
+    return _allBrokerListRef.get();
   }
 
   @Override
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
index 1ca15255cd..e3a1df44db 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
@@ -57,7 +57,9 @@ public class BrokerSelectorUtils {
       return null;
     }
 
-    List<String> commonBrokers = tablesBrokersList.get(0);
+    // Make a copy of the brokersList of the first table. retainAll does 
inplace modifications.
+    // So lists from brokerData should not be used directly.
+    List<String> commonBrokers = new ArrayList<>(tablesBrokersList.get(0));
     for (int i = 1; i < tablesBrokersList.size(); i++) {
       commonBrokers.retainAll(tablesBrokersList.get(i));
     }
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 5a755afc55..e12dc873f6 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -37,6 +37,8 @@ public class ConnectionFactoryTest {
   public void testZkConnection() {
     // Create a dummy Helix structure
     final String givenZkServers = "127.0.0.1:1234";
+    final String givenBrokerInfo = "localhost:2345";
+
     DynamicBrokerSelector dynamicBrokerSelector = Mockito.spy(new 
DynamicBrokerSelector(givenZkServers) {
       @Override
       protected ZkClient getZkClient(String zkServers) {
@@ -47,6 +49,11 @@ public class ConnectionFactoryTest {
       protected ExternalViewReader getEvReader(ZkClient zkClient) {
         return Mockito.mock(ExternalViewReader.class);
       }
+
+      @Override
+      public List<String> getBrokers() {
+        return ImmutableList.of(givenBrokerInfo);
+      }
     });
 
     PinotClientTransport pinotClientTransport = 
Mockito.mock(PinotClientTransport.class);
@@ -57,7 +64,7 @@ public class ConnectionFactoryTest {
             pinotClientTransport);
 
     // Check that the broker list has the right length and has the same servers
-    Assert.assertEquals(connection.getBrokerList(), 
ImmutableList.of(givenZkServers));
+    Assert.assertEquals(connection.getBrokerList(), 
ImmutableList.of(givenBrokerInfo));
   }
 
   @Test
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
index dd3ee3bbbf..d52438ab54 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
@@ -143,7 +143,7 @@ public class DynamicBrokerSelectorTest {
 
   @Test
   public void testGetBrokers() {
-    assertEquals(_dynamicBrokerSelectorUnderTest.getBrokers(), 
ImmutableList.of(ZK_SERVER));
+    assertEquals(_dynamicBrokerSelectorUnderTest.getBrokers(), 
ImmutableList.of("broker1"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to