This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88faba4  ninja fix: fix compilation issue in CassandraClusterInfo
88faba4 is described below

commit 88faba42e5cb3f1384c92024a9c3608135d76218
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Jul 11 11:50:12 2023 -0700

    ninja fix: fix compilation issue in CassandraClusterInfo
---
 CHANGES.txt                                        |  1 +
 .../spark/bulkwriter/CassandraClusterInfo.java     | 65 ++++++++++++----------
 2 files changed, 38 insertions(+), 28 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 947e97a..16c16ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Added caching of Node Settings to improve efficiency (CASSANDRA-18633)
  * Upgrade to JUnit 5 (CASSANDRA-18599)
  * Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605)
  * Add circleci configuration yaml for Cassandra Analytics (CASSANDRA-18578)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index 2768d35..cc04161 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -65,19 +66,17 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     protected transient volatile RingResponse ringResponse;
     protected transient GossipInfoResponse gossipInfo;
     protected transient CassandraContext cassandraContext;
-    protected transient NodeSettings nodeSettings;
-    protected final transient CompletableFuture<List<NodeSettings>> 
allNodeSettings;
+    protected final transient AtomicReference<NodeSettings> nodeSettings;
+    protected final transient List<CompletableFuture<NodeSettings>> 
allNodeSettingFutures;
 
     public CassandraClusterInfo(BulkSparkConf conf)
     {
         this.conf = conf;
         this.cassandraContext = buildCassandraContext();
-        this.allNodeSettings = CompletableFuture.supplyAsync(() -> {
-            LOGGER.info("Getting Cassandra versions from all nodes");
-            return Sidecar.allNodeSettingsBlocking(conf,
-                                                   
cassandraContext.getSidecarClient(),
-                                                   
cassandraContext.clusterConfig);
-        });
+        LOGGER.info("Getting Cassandra versions from all nodes");
+        this.nodeSettings = new AtomicReference<>(null);
+        this.allNodeSettingFutures = 
Sidecar.allNodeSettings(cassandraContext.getSidecarClient(),
+                                                             
cassandraContext.clusterConfig);
     }
 
     @Override
@@ -182,7 +181,7 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
                 try
                 {
                     String partitionerString;
-                    NodeSettings currentNodeSettings = nodeSettings;
+                    NodeSettings currentNodeSettings = nodeSettings.get();
                     if (currentNodeSettings != null)
                     {
                         partitionerString = currentNodeSettings.partitioner();
@@ -347,26 +346,28 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
 
     public String getVersionFromSidecar()
     {
-        LOGGER.info("Getting Cassandra versions from all nodes");
-        List<CompletableFuture<NodeSettings>> futures = 
Sidecar.allNodeSettings(cassandraContext.getSidecarClient(),
-                                                                               
 cassandraContext.clusterConfig);
+        NodeSettings nodeSettings = this.nodeSettings.get();
+        if (nodeSettings != null)
+        {
+            return nodeSettings.releaseVersion();
+        }
 
-        List<NodeSettings> nodeSettings = FutureUtils.bestEffortGet(futures,
-                                                                    
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                    
TimeUnit.SECONDS);
+        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
+                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
+                                                                       
TimeUnit.SECONDS);
 
-        if (nodeSettings.isEmpty())
+        if (allNodeSettings.isEmpty())
         {
             throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     futures.size()));
+                                                     
allNodeSettingFutures.size()));
         }
-        else if (nodeSettings.size() < futures.size())
+        else if (allNodeSettings.size() < allNodeSettingFutures.size())
         {
-            LOGGER.debug("{}/{} instances were used to determine the node 
settings",
-                         nodeSettings.size(), futures.size());
+            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
+                        allNodeSettings.size(), allNodeSettingFutures.size());
         }
 
-        return getLowestVersion(nodeSettings);
+        return getLowestVersion(allNodeSettings);
     }
 
     protected RingResponse getRingResponse()
@@ -472,13 +473,21 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     @VisibleForTesting
     public String getLowestVersion(List<NodeSettings> allNodeSettings)
     {
-        nodeSettings = allNodeSettings
-                .stream()
-                .filter(settings -> 
!settings.releaseVersion().equalsIgnoreCase("unknown"))
-                .min(Comparator.comparing(settings ->
-                        
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(settings.releaseVersion())))
-                .orElseThrow(() -> new RuntimeException("No valid Cassandra 
Versions were returned from Cassandra Sidecar"));
-        return nodeSettings.releaseVersion();
+        NodeSettings ns = this.nodeSettings.get();
+        if (ns != null)
+        {
+            return ns.releaseVersion();
+        }
+
+        // It is possible to run the below computation multiple times. Since 
the computation is local-only, it is OK.
+        ns = allNodeSettings
+             .stream()
+             .filter(settings -> 
!settings.releaseVersion().equalsIgnoreCase("unknown"))
+             .min(Comparator.comparing(settings ->
+                                       
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(settings.releaseVersion())))
+             .orElseThrow(() -> new RuntimeException("No valid Cassandra 
Versions were returned from Cassandra Sidecar"));
+        nodeSettings.compareAndSet(null, ns);
+        return ns.releaseVersion();
     }
 
     protected boolean instanceIsBlocked(RingInstance ignored)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to