This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88faba4 ninja fix: fix compilation issue in CassandraClusterInfo
88faba4 is described below
commit 88faba42e5cb3f1384c92024a9c3608135d76218
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Jul 11 11:50:12 2023 -0700
ninja fix: fix compilation issue in CassandraClusterInfo
---
CHANGES.txt | 1 +
.../spark/bulkwriter/CassandraClusterInfo.java | 65 ++++++++++++----------
2 files changed, 38 insertions(+), 28 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 947e97a..16c16ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Added caching of Node Settings to improve efficiency (CASSANDRA-18633)
* Upgrade to JUnit 5 (CASSANDRA-18599)
* Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605)
* Add circleci configuration yaml for Cassandra Analytics (CASSANDRA-18578)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index 2768d35..cc04161 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -65,19 +66,17 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
protected transient volatile RingResponse ringResponse;
protected transient GossipInfoResponse gossipInfo;
protected transient CassandraContext cassandraContext;
- protected transient NodeSettings nodeSettings;
- protected final transient CompletableFuture<List<NodeSettings>>
allNodeSettings;
+ protected final transient AtomicReference<NodeSettings> nodeSettings;
+ protected final transient List<CompletableFuture<NodeSettings>>
allNodeSettingFutures;
public CassandraClusterInfo(BulkSparkConf conf)
{
this.conf = conf;
this.cassandraContext = buildCassandraContext();
- this.allNodeSettings = CompletableFuture.supplyAsync(() -> {
- LOGGER.info("Getting Cassandra versions from all nodes");
- return Sidecar.allNodeSettingsBlocking(conf,
-
cassandraContext.getSidecarClient(),
-
cassandraContext.clusterConfig);
- });
+ LOGGER.info("Getting Cassandra versions from all nodes");
+ this.nodeSettings = new AtomicReference<>(null);
+ this.allNodeSettingFutures =
Sidecar.allNodeSettings(cassandraContext.getSidecarClient(),
+
cassandraContext.clusterConfig);
}
@Override
@@ -182,7 +181,7 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
try
{
String partitionerString;
- NodeSettings currentNodeSettings = nodeSettings;
+ NodeSettings currentNodeSettings = nodeSettings.get();
if (currentNodeSettings != null)
{
partitionerString = currentNodeSettings.partitioner();
@@ -347,26 +346,28 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
public String getVersionFromSidecar()
{
- LOGGER.info("Getting Cassandra versions from all nodes");
- List<CompletableFuture<NodeSettings>> futures =
Sidecar.allNodeSettings(cassandraContext.getSidecarClient(),
-
cassandraContext.clusterConfig);
+ NodeSettings nodeSettings = this.nodeSettings.get();
+ if (nodeSettings != null)
+ {
+ return nodeSettings.releaseVersion();
+ }
- List<NodeSettings> nodeSettings = FutureUtils.bestEffortGet(futures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
+ List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
+
conf.getSidecarRequestMaxRetryDelayInSeconds(),
+
TimeUnit.SECONDS);
- if (nodeSettings.isEmpty())
+ if (allNodeSettings.isEmpty())
{
throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
- futures.size()));
+
allNodeSettingFutures.size()));
}
- else if (nodeSettings.size() < futures.size())
+ else if (allNodeSettings.size() < allNodeSettingFutures.size())
{
- LOGGER.debug("{}/{} instances were used to determine the node
settings",
- nodeSettings.size(), futures.size());
+ LOGGER.warn("{}/{} instances were used to determine the node
settings",
+ allNodeSettings.size(), allNodeSettingFutures.size());
}
- return getLowestVersion(nodeSettings);
+ return getLowestVersion(allNodeSettings);
}
protected RingResponse getRingResponse()
@@ -472,13 +473,21 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
@VisibleForTesting
public String getLowestVersion(List<NodeSettings> allNodeSettings)
{
- nodeSettings = allNodeSettings
- .stream()
- .filter(settings ->
!settings.releaseVersion().equalsIgnoreCase("unknown"))
- .min(Comparator.comparing(settings ->
-
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(settings.releaseVersion())))
- .orElseThrow(() -> new RuntimeException("No valid Cassandra
Versions were returned from Cassandra Sidecar"));
- return nodeSettings.releaseVersion();
+ NodeSettings ns = this.nodeSettings.get();
+ if (ns != null)
+ {
+ return ns.releaseVersion();
+ }
+
+ // It is possible to run the below computation multiple times. Since
the computation is local-only, it is OK.
+ ns = allNodeSettings
+ .stream()
+ .filter(settings ->
!settings.releaseVersion().equalsIgnoreCase("unknown"))
+ .min(Comparator.comparing(settings ->
+
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(settings.releaseVersion())))
+ .orElseThrow(() -> new RuntimeException("No valid Cassandra
Versions were returned from Cassandra Sidecar"));
+ nodeSettings.compareAndSet(null, ns);
+ return ns.releaseVersion();
}
protected boolean instanceIsBlocked(RingInstance ignored)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]