This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 912fbb4  CASSANDRA-18633: Added caching of Node Settings to improve 
efficiency
912fbb4 is described below

commit 912fbb47fddc07afcf56f5de97e813593bfb890e
Author: Yuriy Semchyshyn <[email protected]>
AuthorDate: Mon Jun 26 14:40:10 2023 -0500

    CASSANDRA-18633: Added caching of Node Settings to improve efficiency
    
    patch by Yuriy Semchyshyn; reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRA-18633
---
 .../java/org/apache/cassandra/clients/Sidecar.java | 66 ++++------------------
 .../spark/bulkwriter/CassandraClusterInfo.java     | 32 +++++++++--
 .../apache/cassandra/spark/utils/FutureUtils.java  | 43 ++++++++++++++
 3 files changed, 83 insertions(+), 58 deletions(-)

diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index bac09e2..9721c32 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -20,17 +20,14 @@
 package org.apache.cassandra.clients;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -155,57 +152,18 @@ public final class Sidecar
         return new SidecarClient(clusterConfig, requestExecutor, 
sidecarConfig, defaultRetryPolicy);
     }
 
-    public static List<NodeSettings> allNodeSettingsBlocking(BulkSparkConf 
conf,
-                                                             SidecarClient 
client,
-                                                             Set<? extends 
SidecarInstance> instances)
+    public static List<CompletableFuture<NodeSettings>> 
allNodeSettings(SidecarClient client,
+                                                                        Set<? 
extends SidecarInstance> instances)
     {
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        List<NodeSettings> allNodeSettings = Collections.synchronizedList(new 
ArrayList<>());
-        for (SidecarInstance instance : instances)
-        {
-            futures.add(client.nodeSettings(instance)
-                              .exceptionally(throwable -> {
-                                  LOGGER.warn(String.format("Failed to execute 
node settings on instance=%s",
-                                                            instance), 
throwable);
-                                  return null;
-                              })
-                              .thenAccept(nodeSettings -> {
-                                  if (nodeSettings != null)
-                                  {
-                                      allNodeSettings.add(nodeSettings);
-                                  }
-                              }));
-        }
-        try
-        {
-            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
-                             
.get(conf.getSidecarRequestMaxRetryDelayInSeconds(), TimeUnit.SECONDS);
-        }
-        catch (ExecutionException | InterruptedException exception)
-        {
-            throw new RuntimeException(exception);
-        }
-        catch (TimeoutException exception)
-        {
-            // Any futures that have already completed will have put their 
results into `allNodeSettings`
-            // at this point. Cancel any remaining futures and move on.
-            for (CompletableFuture<Void> future : futures)
-            {
-                future.cancel(true);
-            }
-        }
-        long successCount = allNodeSettings.size();
-        if (successCount == 0)
-        {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     instances.size()));
-        }
-        else if (successCount < instances.size())
-        {
-            LOGGER.debug("{}/{} instances were used to determine the node 
settings",
-                         successCount, instances.size());
-        }
-        return allNodeSettings;
+        return instances.stream()
+                        .map(instance -> client
+                                .nodeSettings(instance)
+                                .exceptionally(throwable -> {
+                                    LOGGER.warn(String.format("Failed to 
execute node settings on instance=%s",
+                                                              instance), 
throwable);
+                                    return null;
+                                }))
+                        .collect(Collectors.toList());
     }
 
     public static final class ClientConfig
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index e84957f..2768d35 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -48,6 +49,7 @@ import 
org.apache.cassandra.spark.common.client.InstanceStatus;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.FutureUtils;
 import org.jetbrains.annotations.NotNull;
 
 public class CassandraClusterInfo implements ClusterInfo, Closeable
@@ -64,11 +66,18 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     protected transient GossipInfoResponse gossipInfo;
     protected transient CassandraContext cassandraContext;
     protected transient NodeSettings nodeSettings;
+    protected final transient CompletableFuture<List<NodeSettings>> 
allNodeSettings;
 
     public CassandraClusterInfo(BulkSparkConf conf)
     {
         this.conf = conf;
         this.cassandraContext = buildCassandraContext();
+        this.allNodeSettings = CompletableFuture.supplyAsync(() -> {
+            LOGGER.info("Getting Cassandra versions from all nodes");
+            return Sidecar.allNodeSettingsBlocking(conf,
+                                                   
cassandraContext.getSidecarClient(),
+                                                   
cassandraContext.clusterConfig);
+        });
     }
 
     @Override
@@ -339,10 +348,25 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     public String getVersionFromSidecar()
     {
         LOGGER.info("Getting Cassandra versions from all nodes");
-        List<NodeSettings> allNodeSettings = 
Sidecar.allNodeSettingsBlocking(conf,
-                                                                             
cassandraContext.getSidecarClient(),
-                                                                             
cassandraContext.clusterConfig);
-        return getLowestVersion(allNodeSettings);
+        List<CompletableFuture<NodeSettings>> futures = 
Sidecar.allNodeSettings(cassandraContext.getSidecarClient(),
+                                                                               
 cassandraContext.clusterConfig);
+
+        List<NodeSettings> nodeSettings = FutureUtils.bestEffortGet(futures,
+                                                                    
conf.getSidecarRequestMaxRetryDelayInSeconds(),
+                                                                    
TimeUnit.SECONDS);
+
+        if (nodeSettings.isEmpty())
+        {
+            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
+                                                     futures.size()));
+        }
+        else if (nodeSettings.size() < futures.size())
+        {
+            LOGGER.debug("{}/{} instances were used to determine the node 
settings",
+                         nodeSettings.size(), futures.size());
+        }
+
+        return getLowestVersion(nodeSettings);
     }
 
     protected RingResponse getRingResponse()
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
index 785805e..d9a8d6e 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/FutureUtils.java
@@ -21,9 +21,13 @@ package org.apache.cassandra.spark.utils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -162,4 +166,43 @@ public final class FutureUtils
                          });
         return result;
     }
+
+    /**
+     * Makes a best-effort attempt to obtain results of a collection of 
completable futures
+     * within the specified timeout, then combines and returns all received 
non-null values
+     *
+     * @param futures  list of futures to obtain results from
+     * @param timeout  duration of the timeout to use
+     * @param timeUnit units of the timeout
+     * @return list on non-null values obtained
+     */
+    public static <T> List<T> bestEffortGet(List<CompletableFuture<T>> 
futures, long timeout, TimeUnit timeUnit)
+    {
+        try
+        {
+            // As a barrier
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+                             .get(timeout, timeUnit);
+        }
+        catch (InterruptedException | ExecutionException | TimeoutException 
exception)
+        {
+            // Do nothing, cancel later
+        }
+        return futures.stream()
+                      .map(future -> {
+                          if (future.isDone())
+                          {
+                              // Convert exception into null and ignore later
+                              return future.exceptionally(t -> null)
+                                      .join();
+                          }
+                          else
+                          {
+                              future.cancel(true);
+                              return null;
+                          }
+                      })
+                      .filter(Objects::nonNull)
+                      .collect(Collectors.toList());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to