This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 798182a  CASSANDRA-19772: Deprecate option SIDECAR_INSTANCES and 
replace with SIDECAR_CONTACT_POINTS (#63)
798182a is described below

commit 798182a6fda562538c2f44e4f3f92a7cb68cd81c
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Jul 16 21:44:18 2024 -0700

    CASSANDRA-19772: Deprecate option SIDECAR_INSTANCES and replace with 
SIDECAR_CONTACT_POINTS (#63)
    
    This patch introduces a new option SIDECAR_CONTACT_POINTS for both bulk 
writer and reader. The option name better describes the purpose, which is to 
specify the initial contact points to discover the cluster topology. The 
existing option SIDECAR_INSTANCES are used for the same purpose and it is now 
deprecated.
    In addition, it allows including the port value in the addresses when 
defining SIDECAR_CONTACT_POINTS
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19772
---
 README.md                                          |   2 +-
 build.gradle                                       |   8 ++
 .../org/apache/cassandra/spark/utils/MapUtils.java |  40 ++++++
 .../spark/example/DirectWriteAndReadJob.java       |   4 +-
 .../spark/example/LocalS3WriteAndReadJob.java      |   4 +-
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  64 +++++----
 .../spark/bulkwriter/CassandraContext.java         |   2 +-
 .../cassandra/spark/bulkwriter/WriterOptions.java  |   6 +-
 .../spark/common/SidecarInstanceFactory.java       |  64 +++++++++
 .../cassandra/spark/data/CassandraDataLayer.java   |   9 +-
 .../apache/cassandra/spark/data/ClientConfig.java  |  25 +++-
 .../spark/bulkwriter/BulkSparkConfTest.java        | 156 ++++++++++++---------
 .../spark/data/CassandraDataLayerTests.java        |   4 +-
 .../data/CassandraDataSourceHelperCacheTest.java   |   2 +-
 .../cassandra/spark/data/ClientConfigTests.java    |   2 +-
 .../spark/common/SidecarInstanceFactoryTest.java   |  55 ++++++++
 .../apache/cassandra/analytics/SparkTestUtils.java |   4 +-
 17 files changed, 330 insertions(+), 121 deletions(-)

diff --git a/README.md b/README.md
index 536bbb0..96e310d 100644
--- a/README.md
+++ b/README.md
@@ -34,7 +34,7 @@ import org.apache.spark.sql.SparkSession
 
 val sparkSession = SparkSession.builder.getOrCreate()
 val df = 
sparkSession.read.format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
-                          .option("sidecar_instances", 
"localhost,localhost2,localhost3")
+                          .option("sidecar_contact_points", 
"localhost,localhost2,localhost3")
                           .option("keyspace", "sbr_tests")
                           .option("table", "basic_test")
                           .option("DC", "datacenter1")
diff --git a/build.gradle b/build.gradle
index f7c8d24..20312ae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -44,6 +44,14 @@ println("Spark version: ${ext.sparkLabel}")
 ext.jdkLabel = System.getenv("JDK_VERSION") ?: "${analyticsJDKLevel}"
 println("Java source/target compatibility level: ${ext.jdkLabel}")
 
+// validate the builder is indeed using the correct JDK_VERSION as specified
+String actualJdkVersion = System.getProperty("java.version")
+if (!actualJdkVersion.startsWith(ext.jdkLabel))
+{
+  def errorMessage = "Invalid jdk version (Found: ${actualJdkVersion}) for 
jdk-${ext.jdkLabel} build"
+  throw new RuntimeException(errorMessage)
+}
+
 ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?: 
"${rootDir}/dependencies") + "/"
 
 
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
index e899b10..ab51201 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
@@ -22,13 +22,19 @@ package org.apache.cassandra.spark.utils;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Utility methods for {@link Map}
  */
 public final class MapUtils
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MapUtils.class);
+
     private MapUtils()
     {
         throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
@@ -239,4 +245,38 @@ public final class MapUtils
     {
         return options.containsKey(lowerCaseKey(key));
     }
+
+    /**
+     * Resolve the value by consolidating the value associated with the 
current option and the deprecated option.
+     * The method assumes that both the new and the deprecated options expect 
the same value type.
+     * The value of the new option has the precedence over the deprecated one. 
Meaning when both options are present,
+     * the value associated with the new option is returned.
+     *
+     * @param options option keys and values map
+     * @param option new option
+     * @param deprecated deprecated option
+     * @param resolver function to resolve the value. If the input of the 
function is null, it asks for the default value
+     * @return resolved value
+     * @param <T> value type
+     */
+    public static <T> T resolveDeprecated(Map<String, String> options, String 
option, String deprecated, Function<String, T> resolver)
+    {
+        T deprecatedOptionValue = null;
+        if (options.containsKey(deprecated))
+        {
+            LOGGER.warn("The option: {} is deprecated. Please use {} instead. 
See option description for details.", deprecated, option);
+            deprecatedOptionValue = resolver.apply(deprecated);
+        }
+
+        if (options.containsKey(option))
+        {
+            if (deprecatedOptionValue != null)
+            {
+                LOGGER.info("The option: {} is defined. Favor the value over 
{}", option, deprecated);
+            }
+            return resolver.apply(option);
+        }
+
+        return deprecatedOptionValue == null ? resolver.apply(null) : 
deprecatedOptionValue;
+    }
 }
diff --git 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
index c94b195..16ed985 100644
--- 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
+++ 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
@@ -41,7 +41,7 @@ public class DirectWriteAndReadJob extends 
AbstractCassandraJob
     protected JobConfiguration configureJob(SparkContext sc, SparkConf 
sparkConf)
     {
         Map<String, String> writeOptions = new HashMap<>();
-        writeOptions.put("sidecar_instances", 
"localhost,localhost2,localhost3");
+        writeOptions.put("sidecar_contact_points", 
"localhost,localhost2,localhost3");
         writeOptions.put("keyspace", "spark_test");
         writeOptions.put("table", "test");
         writeOptions.put("local_dc", "datacenter1");
@@ -54,7 +54,7 @@ public class DirectWriteAndReadJob extends 
AbstractCassandraJob
                                             
sparkConf.getInt("spark.executor.instances", 1));
         int numCores = coresPerExecutor * numExecutors;
         Map<String, String> readerOptions = new HashMap<>();
-        readerOptions.put("sidecar_instances", 
"localhost,localhost2,localhost3");
+        readerOptions.put("sidecar_contact_points", 
"localhost,localhost2,localhost3");
         readerOptions.put("keyspace", "spark_test");
         readerOptions.put("table", "test");
         readerOptions.put("DC", "datacenter1");
diff --git 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
index 749a051..a7a946b 100644
--- 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
+++ 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
@@ -66,7 +66,7 @@ public class LocalS3WriteAndReadJob extends 
AbstractCassandraJob
     protected JobConfiguration configureJob(SparkContext sc, SparkConf 
sparkConf)
     {
         Map<String, String> writeOptions = new HashMap<>();
-        writeOptions.put("sidecar_instances", sidecarInstances);
+        writeOptions.put("sidecar_contact_points", sidecarInstances);
         writeOptions.put("keyspace", "spark_test");
         writeOptions.put("table", "test");
         writeOptions.put("local_dc", dataCenter);
@@ -92,7 +92,7 @@ public class LocalS3WriteAndReadJob extends 
AbstractCassandraJob
                                             
sparkConf.getInt("spark.executor.instances", 1));
         int numCores = coresPerExecutor * numExecutors;
         Map<String, String> readerOptions = new HashMap<>();
-        readerOptions.put("sidecar_instances", 
"localhost,localhost2,localhost3");
+        readerOptions.put("sidecar_contact_points", 
"localhost,localhost2,localhost3");
         readerOptions.put("keyspace", "spark_test");
         readerOptions.put("table", "test");
         readerOptions.put("DC", "datacenter1");
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 8bc9a7f..442a33f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -36,14 +36,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClientConfig;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
 import org.apache.cassandra.spark.utils.BuildInfo;
 import org.apache.cassandra.spark.utils.MapUtils;
 import org.apache.spark.SparkConf;
@@ -145,10 +146,9 @@ public class BulkSparkConf implements Serializable
     protected final String configuredJobId;
     protected boolean useOpenSsl;
     protected int ringRetryCount;
-    // create sidecarInstances from sidecarInstancesValue and 
effectiveSidecarPort
-    private final String sidecarInstancesValue;
-    private transient Set<? extends SidecarInstance> sidecarInstances; // not 
serialized
-
+    // create sidecarInstances from sidecarContactPointsValue and 
effectiveSidecarPort
+    private final String sidecarContactPointsValue; // It takes comma 
separated values
+    private transient Set<? extends SidecarInstance> sidecarContactPoints; // 
not serialized
 
     public BulkSparkConf(SparkConf conf, Map<String, String> options)
     {
@@ -156,8 +156,8 @@ public class BulkSparkConf implements Serializable
         Optional<Integer> sidecarPortFromOptions = 
MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar 
port");
         this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ? 
sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
         this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ? 
DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
-        this.sidecarInstancesValue = MapUtils.getOrDefault(options, 
WriterOptions.SIDECAR_INSTANCES.name(), null);
-        this.sidecarInstances = sidecarInstances();
+        this.sidecarContactPointsValue = resolveSidecarContactPoints(options);
+        this.sidecarContactPoints = sidecarContactPoints();
         this.keyspace = MapUtils.getOrThrow(options, 
WriterOptions.KEYSPACE.name());
         this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
         this.skipExtendedVerify = MapUtils.getBoolean(options, 
WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
@@ -240,45 +240,47 @@ public class BulkSparkConf implements Serializable
                      .collect(Collectors.toSet());
     }
 
-    protected int resolveSSTableDataSizeInMiB(Map<String, String> options)
+    public static int resolveSSTableDataSizeInMiB(Map<String, String> options)
     {
-        int legacyOptionValue = -1;
-        if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name()))
-        {
-            LOGGER.warn("The writer option: SSTABLE_DATA_SIZE_IN_MB is 
deprecated. " +
-                        "Please use SSTABLE_DATA_SIZE_IN_MIB instead. See 
option description for details.");
-            legacyOptionValue = MapUtils.getInt(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(),
-                                                
DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes");
-        }
+        return MapUtils.resolveDeprecated(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name(), 
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), option -> {
+            if (option == null)
+            {
+                return DEFAULT_SSTABLE_DATA_SIZE_IN_MIB;
+            }
 
-        if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name()))
-        {
-            LOGGER.info("The writer option: SSTABLE_DATA_SIZE_IN_MIB is 
defined. " +
-                        "Favor the value over SSTABLE_DATA_SIZE_IN_MB");
-            return MapUtils.getInt(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name(),
-                                   DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable 
data size in mebibytes");
+            return MapUtils.getInt(options, option, 
DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes");
+        });
+    }
 
-        }
+    public static String resolveSidecarContactPoints(Map<String, String> 
options)
+    {
+        return MapUtils.resolveDeprecated(options, 
WriterOptions.SIDECAR_CONTACT_POINTS.name(), 
WriterOptions.SIDECAR_INSTANCES.name(), option -> {
+            if (option == null)
+            {
+                return null;
+            }
 
-        return legacyOptionValue == -1 ? DEFAULT_SSTABLE_DATA_SIZE_IN_MIB : 
legacyOptionValue;
+            return MapUtils.getOrDefault(options, option, null);
+        });
     }
 
-    protected Set<? extends SidecarInstance> buildSidecarInstances()
+    protected Set<? extends SidecarInstance> buildSidecarContactPoints()
     {
-        String[] split = Objects.requireNonNull(sidecarInstancesValue, "Unable 
to build sidecar instances from null value")
+        String[] split = Objects.requireNonNull(sidecarContactPointsValue, 
"Unable to build sidecar instances from null value")
                                 .split(",");
         return Arrays.stream(split)
-                     .map(hostname -> new SidecarInstanceImpl(hostname, 
effectiveSidecarPort))
+                     .filter(StringUtils::isNotEmpty)
+                     .map(hostname -> 
SidecarInstanceFactory.createFromString(hostname, effectiveSidecarPort))
                      .collect(Collectors.toSet());
     }
 
-    Set<? extends SidecarInstance> sidecarInstances()
+    Set<? extends SidecarInstance> sidecarContactPoints()
     {
-        if (sidecarInstances == null)
+        if (sidecarContactPoints == null)
         {
-            sidecarInstances = buildSidecarInstances();
+            sidecarContactPoints = buildSidecarContactPoints();
         }
-        return sidecarInstances;
+        return sidecarContactPoints;
     }
 
     protected void validateEnvironment() throws RuntimeException
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
index 97d19c2..1da6083 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
@@ -88,7 +88,7 @@ public class CassandraContext implements StartupValidatable, 
Closeable
 
     protected Set<? extends SidecarInstance> createClusterConfig()
     {
-        return conf.sidecarInstances();
+        return conf.sidecarContactPoints();
     }
 
     public SidecarClient getSidecarClient()
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index a7578b0..72a310a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -26,7 +26,11 @@ import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransport
  */
 public enum WriterOptions implements WriterOption
 {
+    @Deprecated // Prefer the equivalent, SIDECAR_CONTACT_POINTS
     SIDECAR_INSTANCES,
+    // The option specifies the initial contact points of sidecar servers to 
discover the cluster topology
+    // Note that the addresses can include port; when port is present, it 
takes precedence over SIDECAR_PORT
+    SIDECAR_CONTACT_POINTS,
     KEYSPACE,
     TABLE,
     BULK_WRITER_CL,
@@ -45,7 +49,7 @@ public enum WriterOptions implements WriterOption
     TRUSTSTORE_PATH,
     TRUSTSTORE_BASE64_ENCODED,
     SIDECAR_PORT,
-    @Deprecated // the size unit `MB` is incorrect, use 
`SSTABLE_DATA_SIZE_IN_MIB` instead
+    @Deprecated // Prefer the equivalent, `SSTABLE_DATA_SIZE_IN_MIB`. The size 
unit `MB` is incorrect and internally treated as MiB.
     SSTABLE_DATA_SIZE_IN_MB,
     SSTABLE_DATA_SIZE_IN_MIB,
     TTL,
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
new file mode 100644
index 0000000..cc6fb09
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+
+public class SidecarInstanceFactory
+{
+    private SidecarInstanceFactory()
+    {
+        throw new UnsupportedOperationException("Utility class");
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarInstanceFactory.class);
+
+    /**
+     * Create SidecarInstance object by parsing the input string, which is IP 
address or hostname and optionally includes port
+     * @param input hostname string that can optionally includes the port. If 
port is present, the defaultPort param is ignored.
+     * @param defaultPort port value used when the input string contains no 
port
+     * @return SidecarInstanceImpl
+     */
+    public static SidecarInstanceImpl createFromString(String input, int 
defaultPort)
+    {
+        Preconditions.checkArgument(StringUtils.isNotEmpty(input), "Unable to 
create sidecar instance from empty input");
+
+        String hostname = input;
+        int port = defaultPort;
+        // has port in the string. The former matches ipv6 and the latter 
matches ipv4 and hostnames
+        // ipv6 with port example: [2024:a::1]:8080
+        if (input.contains("]:") || (!input.startsWith("[") && 
input.contains(":")))
+        {
+            int index = input.lastIndexOf(':');
+            hostname = input.substring(0, index); // includes ']' if it is ipv6
+            String portStr = input.substring(index + 1);
+            port = Integer.parseInt(portStr);
+        }
+
+        LOGGER.info("Create sidecar instance. hostname={} port={}", hostname, 
port);
+        return new SidecarInstanceImpl(hostname, port);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 8154b01..114b5dd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -51,6 +51,7 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +76,7 @@ import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
 import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
 import org.apache.cassandra.spark.config.SchemaFeature;
 import org.apache.cassandra.spark.config.SchemaFeatureSet;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
@@ -154,7 +156,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         this.table = options.table();
         this.quoteIdentifiers = options.quoteIdentifiers();
         this.sidecarClientConfig = sidecarClientConfig;
-        this.sidecarInstances = options.sidecarInstances;
+        this.sidecarInstances = options.sidecarContactPoints;
         this.sidecarPort = options.sidecarPort;
         this.sslConfig = sslConfig;
         this.bigNumberConfigMap = options.bigNumberConfigMap();
@@ -899,14 +901,15 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
 
     protected Set<? extends SidecarInstance> 
initializeClusterConfig(ClientConfig options)
     {
-        return initializeClusterConfig(options.sidecarInstances, 
options.sidecarPort());
+        return initializeClusterConfig(options.sidecarContactPoints, 
options.sidecarPort());
     }
 
     // not intended to be overridden
     private Set<? extends SidecarInstance> initializeClusterConfig(String 
sidecarInstances, int sidecarPort)
     {
         return Arrays.stream(sidecarInstances.split(","))
-                     .map(hostname -> new SidecarInstanceImpl(hostname, 
sidecarPort))
+                     .filter(StringUtils::isNotEmpty)
+                     .map(hostname -> 
SidecarInstanceFactory.createFromString(hostname, sidecarPort))
                      .collect(Collectors.toSet());
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
index dd9675c..32d926a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
@@ -43,7 +43,11 @@ public class ClientConfig
 {
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    public static final String SIDECAR_INSTANCES = "sidecar_instances";
+    // TODO: the key value string is not consistent with WriterOptions and 
keys in BulkSparkConf
+    @Deprecated // Prefer the equivalent, SIDECAR_CONTACT_POINTS
+    public static final String SIDECAR_INSTANCES = "sidecar_contact_points";
+    // The option specifies the initial contact points of sidecar servers to 
discover the cluster topology
+    public static final String SIDECAR_CONTACT_POINTS = 
"sidecar_contact_points";
     public static final String KEYSPACE_KEY = "keyspace";
     public static final String TABLE_KEY = "table";
     public static final String SNAPSHOT_NAME_KEY = "snapshotName";
@@ -78,7 +82,7 @@ public class ClientConfig
     public static final String QUOTE_IDENTIFIERS = "quote_identifiers";
     public static final int DEFAULT_SIDECAR_PORT = 9043;
 
-    protected String sidecarInstances;
+    protected String sidecarContactPoints;
     @Nullable
     protected String keyspace;
     @Nullable
@@ -105,7 +109,7 @@ public class ClientConfig
 
     protected ClientConfig(Map<String, String> options)
     {
-        this.sidecarInstances = parseSidecarInstances(options);
+        this.sidecarContactPoints = parseSidecarContactPoints(options);
         this.keyspace = MapUtils.getOrThrow(options, KEYSPACE_KEY, "keyspace");
         this.table = MapUtils.getOrThrow(options, TABLE_KEY, "table");
         this.snapshotName = MapUtils.getOrDefault(options, SNAPSHOT_NAME_KEY, 
"sbr_" + UUID.randomUUID().toString().replace("-", ""));
@@ -135,9 +139,16 @@ public class ClientConfig
         this.quoteIdentifiers = MapUtils.getBoolean(options, 
QUOTE_IDENTIFIERS, false);
     }
 
-    protected String parseSidecarInstances(Map<String, String> options)
+    protected String parseSidecarContactPoints(Map<String, String> options)
     {
-        return MapUtils.getOrThrow(options, SIDECAR_INSTANCES, 
"sidecar_instances");
+        return MapUtils.resolveDeprecated(options, SIDECAR_CONTACT_POINTS, 
SIDECAR_INSTANCES, option -> {
+            if (option == null)
+            {
+                throw MapUtils.throwable(SIDECAR_CONTACT_POINTS + " or " + 
SIDECAR_INSTANCES).get();
+            }
+
+            return MapUtils.getOrThrow(options, option);
+        });
     }
 
     protected ClearSnapshotStrategy parseClearSnapshotStrategy(boolean 
hasDeprecatedOption,
@@ -155,9 +166,9 @@ public class ClientConfig
         return ClearSnapshotStrategy.parse(clearSnapshotStrategyOption);
     }
 
-    public String sidecarInstances()
+    public String sidecarContactPoints()
     {
-        return sidecarInstances;
+        return sidecarContactPoints;
     }
 
     public String keyspace()
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index b059479..d48ace8 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -35,23 +35,18 @@ import org.jetbrains.annotations.NotNull;
 
 import static 
org.apache.cassandra.spark.bulkwriter.BulkSparkConf.DEFAULT_SIDECAR_PORT;
 import static 
org.apache.cassandra.spark.bulkwriter.BulkSparkConf.MINIMUM_JOB_KEEP_ALIVE_MINUTES;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.emptyString;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class BulkSparkConfTest
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class BulkSparkConfTest
 {
     private SparkConf sparkConf;
     private BulkSparkConf bulkSparkConf;
     private Map<String, String> defaultOptions;
 
     @BeforeEach
-    public void before()
+    void before()
     {
         sparkConf = new SparkConf();
         defaultOptions = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
@@ -64,28 +59,28 @@ public class BulkSparkConfTest
     }
 
     @Test
-    public void testGetBoolean()
+    void testGetBoolean()
     {
         sparkConf.set("spark.cassandra_analytics.job.skip_clean", "true");
-        assertThat(bulkSparkConf.getSkipClean(), is(true));
+        assertThat(bulkSparkConf.getSkipClean()).isTrue();
     }
 
     @Test
-    public void testGetLong()
+    void testGetLong()
     {
         
sparkConf.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds",
 "2222");
-        assertThat(bulkSparkConf.getSidecarRequestRetryDelayMillis(), 
is(2222L));
+        
assertThat(bulkSparkConf.getSidecarRequestRetryDelayMillis()).isEqualTo(2222L);
     }
 
     @Test
-    public void testGetInt()
+    void testGetInt()
     {
         sparkConf.set("spark.cassandra_analytics.request.max_connections", 
"1234");
-        assertThat(bulkSparkConf.getMaxHttpConnections(), is(1234));
+        assertThat(bulkSparkConf.getMaxHttpConnections()).isEqualTo(1234);
     }
 
     @Test
-    public void deprecatedSettingsAreHonored()
+    void deprecatedSettingsAreHonored()
     {
         // Test that deprecated names of settings are in fact picked up 
correctly
 
@@ -102,155 +97,182 @@ public class BulkSparkConfTest
             }
         };
 
-        assertThat(bulkSparkConf.getMaxHttpConnections(), is(1234));
-        assertThat(bulkSparkConf.getHttpResponseTimeoutMs(), is(5678));
+        assertThat(bulkSparkConf.getMaxHttpConnections()).isEqualTo(1234);
+        assertThat(bulkSparkConf.getHttpResponseTimeoutMs()).isEqualTo(5678);
     }
 
     @Test
-    public void calculatesCoresCorrectlyForStaticAllocation()
+    void calculatesCoresCorrectlyForStaticAllocation()
     {
         sparkConf.set("spark.executor.cores", "5");
         sparkConf.set("spark.executor.instances", "5");
-        assertThat(bulkSparkConf.getCores(), is(25));
+        assertThat(bulkSparkConf.getCores()).isEqualTo(25);
     }
 
     @Test
-    public void calculatesCoresCorrectlyForDynamicAllocation()
+    void calculatesCoresCorrectlyForDynamicAllocation()
     {
         sparkConf.set("spark.executor.cores", "6");
         sparkConf.set("spark.dynamicAllocation.maxExecutors", "7");
-        assertThat(bulkSparkConf.getCores(), is(42));
+        assertThat(bulkSparkConf.getCores()).isEqualTo(42);
     }
 
     @Test
-    public void ensureSetupSparkConfAddsPerformsNecessaryTasks()
+    void ensureSetupSparkConfAddsPerformsNecessaryTasks()
     {
-        assertThat(sparkConf.get("spark.kryo.registrator", ""), 
is(emptyString()));
-        assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), 
is(emptyString()));
+        assertThat(sparkConf.get("spark.kryo.registrator", "")).isEmpty();
+        assertThat(sparkConf.get("spark.executor.extraJavaOptions", 
"")).isEmpty();
         BulkSparkConf.setupSparkConf(sparkConf, true);
-        assertEquals("," + SbwKryoRegistrator.class.getName(), 
sparkConf.get("spark.kryo.registrator", ""));
+        assertThat(sparkConf.get("spark.kryo.registrator", ""))
+        .isEqualTo("," + SbwKryoRegistrator.class.getName());
         if (BuildInfo.isAtLeastJava11(BuildInfo.javaSpecificationVersion()))
         {
-            assertEquals(BulkSparkConf.JDK11_OPTIONS, 
sparkConf.get("spark.executor.extraJavaOptions", ""));
+            assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""))
+            .isEqualTo(BulkSparkConf.JDK11_OPTIONS);
         }
     }
 
     @Test
-    public void withProperMtlsSettingsWillCreateSuccessfully()
+    void withProperMtlsSettingsWillCreateSuccessfully()
     {
         // mTLS is now required, and the BulkSparkConf constructor fails if 
the options aren't present
         Map<String, String> options = copyDefaultOptions();
         SparkConf sparkConf = new SparkConf();
-        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+        assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf, 
options));
     }
 
     @Test
-    public void keystorePathRequiredIfBase64EncodedKeystoreNotSet()
+    void keystorePathRequiredIfBase64EncodedKeystoreNotSet()
     {
         Map<String, String> options = copyDefaultOptions();
         options.remove(WriterOptions.KEYSTORE_PATH.name());
         SparkConf sparkConf = new SparkConf();
-        NullPointerException npe = assertThrows(NullPointerException.class,
-                                                () -> new 
BulkSparkConf(sparkConf, options));
-        assertEquals("Keystore password was set. But both keystore path and 
base64 encoded string are not set. "
-                     + "Please either set option " + 
WriterOptions.KEYSTORE_PATH
-                     + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, 
npe.getMessage());
+        assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+        .isExactlyInstanceOf(NullPointerException.class)
+        .hasMessage("Keystore password was set. But both keystore path and 
base64 encoded string are not set. "
+                    + "Please either set option " + WriterOptions.KEYSTORE_PATH
+                    + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED);
     }
 
     @Test
-    public void testSkipClean()
+    void testSkipClean()
     {
-        assertFalse(bulkSparkConf.getSkipClean());
+        assertThat(bulkSparkConf.getSkipClean()).isFalse();
         sparkConf.set(BulkSparkConf.SKIP_CLEAN, "true");
-        assertTrue(bulkSparkConf.getSkipClean());
+        assertThat(bulkSparkConf.getSkipClean()).isTrue();
     }
 
     @Test
-    public void testDefaultSidecarPort()
+    void testDefaultSidecarPort()
     {
         bulkSparkConf = new BulkSparkConf(new SparkConf(), defaultOptions);
-        assertEquals(-1, bulkSparkConf.getUserProvidedSidecarPort());
-        assertEquals(DEFAULT_SIDECAR_PORT, 
bulkSparkConf.getEffectiveSidecarPort());
+        assertThat(bulkSparkConf.getUserProvidedSidecarPort()).isEqualTo(-1);
+        
assertThat(bulkSparkConf.getEffectiveSidecarPort()).isEqualTo(DEFAULT_SIDECAR_PORT);
     }
 
     @Test
-    public void testSidecarPortSetByOptions()
+    void testSidecarPortSetByOptions()
     {
         Map<String, String> options = copyDefaultOptions();
         options.put(WriterOptions.SIDECAR_PORT.name(), "9999");
         bulkSparkConf = new BulkSparkConf(new SparkConf(), options);
-        assertEquals(9999, bulkSparkConf.getUserProvidedSidecarPort());
-        assertEquals(9999, bulkSparkConf.getEffectiveSidecarPort());
+        assertThat(bulkSparkConf.getUserProvidedSidecarPort()).isEqualTo(9999);
+        assertThat(bulkSparkConf.getEffectiveSidecarPort()).isEqualTo(9999);
     }
 
     @Test
-    public void testSidecarPortSetByProperty()
+    void testSidecarPortSetByProperty()
     {
         // Spark conf loads values from system properties, but we can also 
test by calling `.set` explicitly.
         // This makes the test not pollute global (System.properties) state 
but still tests the same basic path.
         SparkConf conf = new SparkConf()
                          .set(BulkSparkConf.SIDECAR_PORT, "9876");
         bulkSparkConf = new BulkSparkConf(conf, defaultOptions);
-        assertEquals(9876, bulkSparkConf.getUserProvidedSidecarPort());
-        assertEquals(9876, bulkSparkConf.getEffectiveSidecarPort());
+        assertThat(bulkSparkConf.getUserProvidedSidecarPort()).isEqualTo(9876);
+        assertThat(bulkSparkConf.getEffectiveSidecarPort()).isEqualTo(9876);
     }
 
     @Test
-    public void testKeystoreBase64EncodedStringSet()
+    void testKeystoreBase64EncodedStringSet()
     {
         Map<String, String> options = copyDefaultOptions();
         options.remove(WriterOptions.KEYSTORE_PATH.name());
         options.put(WriterOptions.KEYSTORE_BASE64_ENCODED.name(), 
"dummy_base64_encoded_keystore");
-        bulkSparkConf = new BulkSparkConf(sparkConf, defaultOptions);
+        assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf, 
defaultOptions));
     }
 
     @Test
-    public void testTrustStorePasswordSetPathNotSet()
+    void testTrustStorePasswordSetPathNotSet()
     {
         Map<String, String> options = copyDefaultOptions();
         options.put(WriterOptions.TRUSTSTORE_PATH.name(), "dummy");
-        NullPointerException npe = assertThrows(NullPointerException.class,
-                                                () -> new 
BulkSparkConf(sparkConf, options));
-        assertEquals("Trust Store Path was provided, but password is missing. "
-                     + "Please provide option " + 
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
+        assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+        .isExactlyInstanceOf(NullPointerException.class)
+        .hasMessage("Trust Store Path was provided, but password is missing. "
+                    + "Please provide option " + 
WriterOptions.TRUSTSTORE_PASSWORD);
     }
 
     @Test
     void testQuoteIdentifiers()
     {
-        assertFalse(bulkSparkConf.quoteIdentifiers);
+        assertThat(bulkSparkConf.quoteIdentifiers).isFalse();
         Map<String, String> options = copyDefaultOptions();
         options.put(WriterOptions.QUOTE_IDENTIFIERS.name(), "true");
         BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
-        assertNotNull(bulkSparkConf);
-        assertTrue(bulkSparkConf.quoteIdentifiers);
+        assertThat(bulkSparkConf).isNotNull();
+        assertThat(bulkSparkConf.quoteIdentifiers).isTrue();
     }
 
     @Test
-    public void testInvalidJobKeepAliveMinutes()
+    void testInvalidJobKeepAliveMinutes()
     {
         Map<String, String> options = copyDefaultOptions();
         options.put(WriterOptions.JOB_KEEP_ALIVE_MINUTES.name(), "-100");
-        IllegalArgumentException iae = 
assertThrows(IllegalArgumentException.class, () -> new BulkSparkConf(sparkConf, 
options));
-        assertEquals("Invalid value for the 'JOB_KEEP_ALIVE_MINUTES' Bulk 
Writer option (-100). It cannot be less than the minimum 10",
-                     iae.getMessage());
+        assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid value for the 'JOB_KEEP_ALIVE_MINUTES' Bulk 
Writer option (-100). It cannot be less than the minimum 10");
     }
 
     @Test
-    public void testDefaultJobKeepAliveMinutes()
+    void testDefaultJobKeepAliveMinutes()
     {
         Map<String, String> options = copyDefaultOptions();
         BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
-        assertEquals(MINIMUM_JOB_KEEP_ALIVE_MINUTES, 
conf.getJobKeepAliveMinutes());
+        
assertThat(conf.getJobKeepAliveMinutes()).isEqualTo(MINIMUM_JOB_KEEP_ALIVE_MINUTES);
     }
 
     @Test
-    public void testJobKeepAliveMinutes()
+    void testJobKeepAliveMinutes()
     {
         Map<String, String> options = copyDefaultOptions();
         options.put(WriterOptions.JOB_KEEP_ALIVE_MINUTES.name(), "30");
         BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
-        assertEquals(30, conf.getJobKeepAliveMinutes());
+        assertThat(conf.getJobKeepAliveMinutes()).isEqualTo(30);
+    }
+
+    @Test
+    void testSidecarContactPoints()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo("127.0.0.1");
+
+        String sidecarInstances = "127.0.0.1,128.0.0.2";
+        options.put(WriterOptions.SIDECAR_INSTANCES.name(), sidecarInstances);
+        
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo(sidecarInstances);
+
+        String contactPoints = "localhost1,localhost2";
+        options.put(WriterOptions.SIDECAR_CONTACT_POINTS.name(), 
contactPoints);
+        
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo(contactPoints);
+
+        String contactPointsWithPort = "localhost1:9999,localhost2:9999";
+        options.put(WriterOptions.SIDECAR_CONTACT_POINTS.name(), 
contactPointsWithPort);
+        
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo(contactPointsWithPort);
+
+        options.remove(WriterOptions.SIDECAR_INSTANCES.name());
+        options.remove(WriterOptions.SIDECAR_CONTACT_POINTS.name());
+        assertThat(BulkSparkConf.resolveSidecarContactPoints(options))
+        .describedAs("When none of the sidecar options are define. It resolves 
to the default value null")
+        .isNull();
     }
 
     private Map<String, String> copyDefaultOptions()
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
index 6baf693..cf826b7 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
@@ -36,7 +36,7 @@ class CassandraDataLayerTests
     public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS = 
ImmutableMap.of(
     "keyspace", "big-data",
     "table", "customers",
-    "sidecar_instances", "localhost");
+    "sidecar_contact_points", "localhost");
 
     @Test
     void testDefaultClearSnapshotStrategy()
@@ -45,7 +45,7 @@ class CassandraDataLayerTests
         ClientConfig clientConfig = ClientConfig.create(options);
         assertEquals("big-data", clientConfig.keyspace());
         assertEquals("customers", clientConfig.table());
-        assertEquals("localhost", clientConfig.sidecarInstances());
+        assertEquals("localhost", clientConfig.sidecarContactPoints());
         ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy = 
clientConfig.clearSnapshotStrategy();
         assertTrue(clearSnapshotStrategy.shouldClearOnCompletion());
         assertEquals("2d", clearSnapshotStrategy.ttl());
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
index 71e2758..208f697 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class CassandraDataSourceHelperCacheTest
 {
     public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
-            ImmutableMap.of("sidecar_instances", "127.0.0.1",
+            ImmutableMap.of("sidecar_contact_points", "127.0.0.1",
                             "keyspace", "big-data",
                             "table", "customers");
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
index 7e82c91..f9f6b61 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
@@ -36,7 +36,7 @@ class ClientConfigTests
     public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS = 
ImmutableMap.of(
     "keyspace", "big-data",
     "table", "customers",
-    "sidecar_instances", "localhost");
+    "sidecar_contact_points", "localhost");
 
     @ParameterizedTest
     @ValueSource(strings = {"2h", "200s", "4d", "60m", "  60m", "50s ", " 32d 
"})
diff --git 
a/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/common/SidecarInstanceFactoryTest.java
 
b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/common/SidecarInstanceFactoryTest.java
new file mode 100644
index 0000000..719f5d4
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/common/SidecarInstanceFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class SidecarInstanceFactoryTest
+{
+    @Test
+    void testCreateSidecarInstance()
+    {
+        assertThatThrownBy(() -> SidecarInstanceFactory.createFromString("", 
9999))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Unable to create sidecar instance from empty input");
+
+        
assertSidecarInstance(SidecarInstanceFactory.createFromString("localhost", 
9999),
+                              "localhost", 9999);
+        
assertSidecarInstance(SidecarInstanceFactory.createFromString("[2024:a::1]", 
9999),
+                              "[2024:a::1]", 9999);
+        
assertSidecarInstance(SidecarInstanceFactory.createFromString("localhost:8888", 
9999),
+                              "localhost", 8888);
+        
assertSidecarInstance(SidecarInstanceFactory.createFromString("127.0.0.1:8888", 
9999),
+                              "127.0.0.1", 8888);
+        
assertSidecarInstance(SidecarInstanceFactory.createFromString("[2024:a::1]:8888",
 9999),
+                              "[2024:a::1]", 8888);
+    }
+
+    private void assertSidecarInstance(SidecarInstance sidecarInstance, String 
expectedHostname, int expectedPort)
+    {
+        assertThat(sidecarInstance.hostname()).isEqualTo(expectedHostname);
+        assertThat(sidecarInstance.port()).isEqualTo(expectedPort);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index de14a2b..9628bed 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -116,7 +116,7 @@ public class SparkTestUtils
         int numCores = coresPerExecutor * numExecutors;
 
         return 
sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
-                  .option("sidecar_instances", sidecarInstancesOption(cluster, 
dnsResolver))
+                  .option("sidecar_contact_points", 
sidecarInstancesOption(cluster, dnsResolver))
                   .option("keyspace", tableName.keyspace()) // unquoted
                   .option("table", tableName.table()) // unquoted
                   .option("DC", "datacenter1")
@@ -147,7 +147,7 @@ public class SparkTestUtils
     {
         return df.write()
                  
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
-                 .option("sidecar_instances", sidecarInstancesOption(cluster, 
dnsResolver))
+                 .option("sidecar_contact_points", 
sidecarInstancesOption(cluster, dnsResolver))
                  .option("keyspace", tableName.keyspace())
                  .option("table", tableName.table())
                  .option("local_dc", "datacenter1")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to