This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 912fbb4 CASSANDRA-18633: Added caching of Node Settings to improve
efficiency
912fbb4 is described below
commit 912fbb47fddc07afcf56f5de97e813593bfb890e
Author: Yuriy Semchyshyn <[email protected]>
AuthorDate: Mon Jun 26 14:40:10 2023 -0500
CASSANDRA-18633: Added caching of Node Settings to improve efficiency
patch by Yuriy Semchyshyn; reviewed by Dinesh Joshi, Yifan Cai for
CASSANDRA-18633
---
.../java/org/apache/cassandra/clients/Sidecar.java | 66 ++++------------------
.../spark/bulkwriter/CassandraClusterInfo.java | 32 +++++++++--
.../apache/cassandra/spark/utils/FutureUtils.java | 43 ++++++++++++++
3 files changed, 83 insertions(+), 58 deletions(-)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index bac09e2..9721c32 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -20,17 +20,14 @@
package org.apache.cassandra.clients;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,57 +152,18 @@ public final class Sidecar
return new SidecarClient(clusterConfig, requestExecutor,
sidecarConfig, defaultRetryPolicy);
}
- public static List<NodeSettings> allNodeSettingsBlocking(BulkSparkConf
conf,
- SidecarClient
client,
- Set<? extends
SidecarInstance> instances)
+ public static List<CompletableFuture<NodeSettings>>
allNodeSettings(SidecarClient client,
+ Set<?
extends SidecarInstance> instances)
{
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- List<NodeSettings> allNodeSettings = Collections.synchronizedList(new
ArrayList<>());
- for (SidecarInstance instance : instances)
- {
- futures.add(client.nodeSettings(instance)
- .exceptionally(throwable -> {
- LOGGER.warn(String.format("Failed to execute
node settings on instance=%s",
- instance),
throwable);
- return null;
- })
- .thenAccept(nodeSettings -> {
- if (nodeSettings != null)
- {
- allNodeSettings.add(nodeSettings);
- }
- }));
- }
- try
- {
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
-
.get(conf.getSidecarRequestMaxRetryDelayInSeconds(), TimeUnit.SECONDS);
- }
- catch (ExecutionException | InterruptedException exception)
- {
- throw new RuntimeException(exception);
- }
- catch (TimeoutException exception)
- {
- // Any futures that have already completed will have put their
results into `allNodeSettings`
- // at this point. Cancel any remaining futures and move on.
- for (CompletableFuture<Void> future : futures)
- {
- future.cancel(true);
- }
- }
- long successCount = allNodeSettings.size();
- if (successCount == 0)
- {
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
- instances.size()));
- }
- else if (successCount < instances.size())
- {
- LOGGER.debug("{}/{} instances were used to determine the node
settings",
- successCount, instances.size());
- }
- return allNodeSettings;
+ return instances.stream()
+ .map(instance -> client
+ .nodeSettings(instance)
+ .exceptionally(throwable -> {
+ LOGGER.warn(String.format("Failed to
execute node settings on instance=%s",
+ instance),
throwable);
+ return null;
+ }))
+ .collect(Collectors.toList());
}
public static final class ClientConfig
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index e84957f..2768d35 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -48,6 +49,7 @@ import
org.apache.cassandra.spark.common.client.InstanceStatus;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.FutureUtils;
import org.jetbrains.annotations.NotNull;
public class CassandraClusterInfo implements ClusterInfo, Closeable
@@ -64,11 +66,18 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
protected transient GossipInfoResponse gossipInfo;
protected transient CassandraContext cassandraContext;
protected transient NodeSettings nodeSettings;
+ protected final transient CompletableFuture<List<NodeSettings>>
allNodeSettings;
public CassandraClusterInfo(BulkSparkConf conf)
{
this.conf = conf;
this.cassandraContext = buildCassandraContext();
+ this.allNodeSettings = CompletableFuture.supplyAsync(() -> {
+ LOGGER.info("Getting Cassandra versions from all nodes");
+ return Sidecar.allNodeSettingsBlocking(conf,
+
cassandraContext.getSidecarClient(),
+
cassandraContext.clusterConfig);
+ });
}
@Override
@@ -339,10 +348,25 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
public String getVersionFromSidecar()
{
LOGGER.info("Getting Cassandra versions from all nodes");
- List<NodeSettings> allNodeSettings =
Sidecar.allNodeSettingsBlocking(conf,
-
cassandraContext.getSidecarClient(),
-
cassandraContext.clusterConfig);
- return getLowestVersion(allNodeSettings);
+ List<CompletableFuture<NodeSettings>> futures =
Sidecar.allNodeSettings(cassandraContext.getSidecarClient(),
+
cassandraContext.clusterConfig);
+
+ List<NodeSettings> nodeSettings = FutureUtils.bestEffortGet(futures,
+
conf.getSidecarRequestMaxRetryDelayInSeconds(),
+
TimeUnit.SECONDS);
+
+ if (nodeSettings.isEmpty())
+ {
+ throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
+ futures.size()));
+ }
+ else if (nodeSettings.size() < futures.size())
+ {
+ LOGGER.debug("{}/{} instances were used to determine the node
settings",
+ nodeSettings.size(), futures.size());
+ }
+
+ return getLowestVersion(nodeSettings);
}
protected RingResponse getRingResponse()
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
index 785805e..d9a8d6e 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
@@ -21,9 +21,13 @@ package org.apache.cassandra.spark.utils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -162,4 +166,43 @@ public final class FutureUtils
});
return result;
}
+
+ /**
+ * Makes a best-effort attempt to obtain results of a collection of
completable futures
+ * within the specified timeout, then combines and returns all received
non-null values
+ *
+ * @param futures list of futures to obtain results from
+ * @param timeout duration of the timeout to use
+ * @param timeUnit units of the timeout
+ * @return list on non-null values obtained
+ */
+ public static <T> List<T> bestEffortGet(List<CompletableFuture<T>>
futures, long timeout, TimeUnit timeUnit)
+ {
+ try
+ {
+ // As a barrier
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .get(timeout, timeUnit);
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException
exception)
+ {
+ // Do nothing, cancel later
+ }
+ return futures.stream()
+ .map(future -> {
+ if (future.isDone())
+ {
+ // Convert exception into null and ignore later
+ return future.exceptionally(t -> null)
+ .join();
+ }
+ else
+ {
+ future.cancel(true);
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]