This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 798182a CASSANDRA-19772: Deprecate option SIDECAR_INSTANCES and
replace with SIDECAR_CONTACT_POINTS (#63)
798182a is described below
commit 798182a6fda562538c2f44e4f3f92a7cb68cd81c
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Jul 16 21:44:18 2024 -0700
CASSANDRA-19772: Deprecate option SIDECAR_INSTANCES and replace with
SIDECAR_CONTACT_POINTS (#63)
This patch introduces a new option SIDECAR_CONTACT_POINTS for both bulk
writer and reader. The option name better describes the purpose, which is to
specify the initial contact points to discover the cluster topology. The
existing option SIDECAR_INSTANCES are used for the same purpose and it is now
deprecated.
In addition, it allows including the port value in the addresses when
defining SIDECAR_CONTACT_POINTS
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19772
---
README.md | 2 +-
build.gradle | 8 ++
.../org/apache/cassandra/spark/utils/MapUtils.java | 40 ++++++
.../spark/example/DirectWriteAndReadJob.java | 4 +-
.../spark/example/LocalS3WriteAndReadJob.java | 4 +-
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 64 +++++----
.../spark/bulkwriter/CassandraContext.java | 2 +-
.../cassandra/spark/bulkwriter/WriterOptions.java | 6 +-
.../spark/common/SidecarInstanceFactory.java | 64 +++++++++
.../cassandra/spark/data/CassandraDataLayer.java | 9 +-
.../apache/cassandra/spark/data/ClientConfig.java | 25 +++-
.../spark/bulkwriter/BulkSparkConfTest.java | 156 ++++++++++++---------
.../spark/data/CassandraDataLayerTests.java | 4 +-
.../data/CassandraDataSourceHelperCacheTest.java | 2 +-
.../cassandra/spark/data/ClientConfigTests.java | 2 +-
.../spark/common/SidecarInstanceFactoryTest.java | 55 ++++++++
.../apache/cassandra/analytics/SparkTestUtils.java | 4 +-
17 files changed, 330 insertions(+), 121 deletions(-)
diff --git a/README.md b/README.md
index 536bbb0..96e310d 100644
--- a/README.md
+++ b/README.md
@@ -34,7 +34,7 @@ import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder.getOrCreate()
val df =
sparkSession.read.format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
- .option("sidecar_instances",
"localhost,localhost2,localhost3")
+ .option("sidecar_contact_points",
"localhost,localhost2,localhost3")
.option("keyspace", "sbr_tests")
.option("table", "basic_test")
.option("DC", "datacenter1")
diff --git a/build.gradle b/build.gradle
index f7c8d24..20312ae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -44,6 +44,14 @@ println("Spark version: ${ext.sparkLabel}")
ext.jdkLabel = System.getenv("JDK_VERSION") ?: "${analyticsJDKLevel}"
println("Java source/target compatibility level: ${ext.jdkLabel}")
+// validate the builder is indeed using the correct JDK_VERSION as specified
+String actualJdkVersion = System.getProperty("java.version")
+if (!actualJdkVersion.startsWith(ext.jdkLabel))
+{
+ def errorMessage = "Invalid jdk version (Found: ${actualJdkVersion}) for
jdk-${ext.jdkLabel} build"
+ throw new RuntimeException(errorMessage)
+}
+
ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?:
"${rootDir}/dependencies") + "/"
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
index e899b10..ab51201 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
@@ -22,13 +22,19 @@ package org.apache.cassandra.spark.utils;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Utility methods for {@link Map}
*/
public final class MapUtils
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MapUtils.class);
+
private MapUtils()
{
throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
@@ -239,4 +245,38 @@ public final class MapUtils
{
return options.containsKey(lowerCaseKey(key));
}
+
+ /**
+ * Resolve the value by consolidating the value associated with the
current option and the deprecated option.
+ * The method assumes that both the new and the deprecated options expect
the same value type.
+ * The value of the new option has the precedence over the deprecated one.
Meaning when both options are present,
+ * the value associated with the new option is returned.
+ *
+ * @param options option keys and values map
+ * @param option new option
+ * @param deprecated deprecated option
+ * @param resolver function to resolve the value. If the input of the
function is null, it asks for the default value
+ * @return resolved value
+ * @param <T> value type
+ */
+ public static <T> T resolveDeprecated(Map<String, String> options, String
option, String deprecated, Function<String, T> resolver)
+ {
+ T deprecatedOptionValue = null;
+ if (options.containsKey(deprecated))
+ {
+ LOGGER.warn("The option: {} is deprecated. Please use {} instead.
See option description for details.", deprecated, option);
+ deprecatedOptionValue = resolver.apply(deprecated);
+ }
+
+ if (options.containsKey(option))
+ {
+ if (deprecatedOptionValue != null)
+ {
+ LOGGER.info("The option: {} is defined. Favor the value over
{}", option, deprecated);
+ }
+ return resolver.apply(option);
+ }
+
+ return deprecatedOptionValue == null ? resolver.apply(null) :
deprecatedOptionValue;
+ }
}
diff --git
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
index c94b195..16ed985 100644
---
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
+++
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/DirectWriteAndReadJob.java
@@ -41,7 +41,7 @@ public class DirectWriteAndReadJob extends
AbstractCassandraJob
protected JobConfiguration configureJob(SparkContext sc, SparkConf
sparkConf)
{
Map<String, String> writeOptions = new HashMap<>();
- writeOptions.put("sidecar_instances",
"localhost,localhost2,localhost3");
+ writeOptions.put("sidecar_contact_points",
"localhost,localhost2,localhost3");
writeOptions.put("keyspace", "spark_test");
writeOptions.put("table", "test");
writeOptions.put("local_dc", "datacenter1");
@@ -54,7 +54,7 @@ public class DirectWriteAndReadJob extends
AbstractCassandraJob
sparkConf.getInt("spark.executor.instances", 1));
int numCores = coresPerExecutor * numExecutors;
Map<String, String> readerOptions = new HashMap<>();
- readerOptions.put("sidecar_instances",
"localhost,localhost2,localhost3");
+ readerOptions.put("sidecar_contact_points",
"localhost,localhost2,localhost3");
readerOptions.put("keyspace", "spark_test");
readerOptions.put("table", "test");
readerOptions.put("DC", "datacenter1");
diff --git
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
index 749a051..a7a946b 100644
---
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
+++
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/LocalS3WriteAndReadJob.java
@@ -66,7 +66,7 @@ public class LocalS3WriteAndReadJob extends
AbstractCassandraJob
protected JobConfiguration configureJob(SparkContext sc, SparkConf
sparkConf)
{
Map<String, String> writeOptions = new HashMap<>();
- writeOptions.put("sidecar_instances", sidecarInstances);
+ writeOptions.put("sidecar_contact_points", sidecarInstances);
writeOptions.put("keyspace", "spark_test");
writeOptions.put("table", "test");
writeOptions.put("local_dc", dataCenter);
@@ -92,7 +92,7 @@ public class LocalS3WriteAndReadJob extends
AbstractCassandraJob
sparkConf.getInt("spark.executor.instances", 1));
int numCores = coresPerExecutor * numExecutors;
Map<String, String> readerOptions = new HashMap<>();
- readerOptions.put("sidecar_instances",
"localhost,localhost2,localhost3");
+ readerOptions.put("sidecar_contact_points",
"localhost,localhost2,localhost3");
readerOptions.put("keyspace", "spark_test");
readerOptions.put("table", "test");
readerOptions.put("DC", "datacenter1");
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 8bc9a7f..442a33f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -36,14 +36,15 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClientConfig;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
import org.apache.cassandra.spark.utils.BuildInfo;
import org.apache.cassandra.spark.utils.MapUtils;
import org.apache.spark.SparkConf;
@@ -145,10 +146,9 @@ public class BulkSparkConf implements Serializable
protected final String configuredJobId;
protected boolean useOpenSsl;
protected int ringRetryCount;
- // create sidecarInstances from sidecarInstancesValue and
effectiveSidecarPort
- private final String sidecarInstancesValue;
- private transient Set<? extends SidecarInstance> sidecarInstances; // not
serialized
-
+ // create sidecarInstances from sidecarContactPointsValue and
effectiveSidecarPort
+ private final String sidecarContactPointsValue; // It takes comma
separated values
+ private transient Set<? extends SidecarInstance> sidecarContactPoints; //
not serialized
public BulkSparkConf(SparkConf conf, Map<String, String> options)
{
@@ -156,8 +156,8 @@ public class BulkSparkConf implements Serializable
Optional<Integer> sidecarPortFromOptions =
MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar
port");
this.userProvidedSidecarPort = sidecarPortFromOptions.isPresent() ?
sidecarPortFromOptions.get() : getOptionalInt(SIDECAR_PORT).orElse(-1);
this.effectiveSidecarPort = this.userProvidedSidecarPort == -1 ?
DEFAULT_SIDECAR_PORT : this.userProvidedSidecarPort;
- this.sidecarInstancesValue = MapUtils.getOrDefault(options,
WriterOptions.SIDECAR_INSTANCES.name(), null);
- this.sidecarInstances = sidecarInstances();
+ this.sidecarContactPointsValue = resolveSidecarContactPoints(options);
+ this.sidecarContactPoints = sidecarContactPoints();
this.keyspace = MapUtils.getOrThrow(options,
WriterOptions.KEYSPACE.name());
this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
this.skipExtendedVerify = MapUtils.getBoolean(options,
WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
@@ -240,45 +240,47 @@ public class BulkSparkConf implements Serializable
.collect(Collectors.toSet());
}
- protected int resolveSSTableDataSizeInMiB(Map<String, String> options)
+ public static int resolveSSTableDataSizeInMiB(Map<String, String> options)
{
- int legacyOptionValue = -1;
- if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name()))
- {
- LOGGER.warn("The writer option: SSTABLE_DATA_SIZE_IN_MB is
deprecated. " +
- "Please use SSTABLE_DATA_SIZE_IN_MIB instead. See
option description for details.");
- legacyOptionValue = MapUtils.getInt(options,
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(),
-
DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes");
- }
+ return MapUtils.resolveDeprecated(options,
WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name(),
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), option -> {
+ if (option == null)
+ {
+ return DEFAULT_SSTABLE_DATA_SIZE_IN_MIB;
+ }
- if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name()))
- {
- LOGGER.info("The writer option: SSTABLE_DATA_SIZE_IN_MIB is
defined. " +
- "Favor the value over SSTABLE_DATA_SIZE_IN_MB");
- return MapUtils.getInt(options,
WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name(),
- DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable
data size in mebibytes");
+ return MapUtils.getInt(options, option,
DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes");
+ });
+ }
- }
+ public static String resolveSidecarContactPoints(Map<String, String>
options)
+ {
+ return MapUtils.resolveDeprecated(options,
WriterOptions.SIDECAR_CONTACT_POINTS.name(),
WriterOptions.SIDECAR_INSTANCES.name(), option -> {
+ if (option == null)
+ {
+ return null;
+ }
- return legacyOptionValue == -1 ? DEFAULT_SSTABLE_DATA_SIZE_IN_MIB :
legacyOptionValue;
+ return MapUtils.getOrDefault(options, option, null);
+ });
}
- protected Set<? extends SidecarInstance> buildSidecarInstances()
+ protected Set<? extends SidecarInstance> buildSidecarContactPoints()
{
- String[] split = Objects.requireNonNull(sidecarInstancesValue, "Unable
to build sidecar instances from null value")
+ String[] split = Objects.requireNonNull(sidecarContactPointsValue,
"Unable to build sidecar instances from null value")
.split(",");
return Arrays.stream(split)
- .map(hostname -> new SidecarInstanceImpl(hostname,
effectiveSidecarPort))
+ .filter(StringUtils::isNotEmpty)
+ .map(hostname ->
SidecarInstanceFactory.createFromString(hostname, effectiveSidecarPort))
.collect(Collectors.toSet());
}
- Set<? extends SidecarInstance> sidecarInstances()
+ Set<? extends SidecarInstance> sidecarContactPoints()
{
- if (sidecarInstances == null)
+ if (sidecarContactPoints == null)
{
- sidecarInstances = buildSidecarInstances();
+ sidecarContactPoints = buildSidecarContactPoints();
}
- return sidecarInstances;
+ return sidecarContactPoints;
}
protected void validateEnvironment() throws RuntimeException
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
index 97d19c2..1da6083 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
@@ -88,7 +88,7 @@ public class CassandraContext implements StartupValidatable,
Closeable
protected Set<? extends SidecarInstance> createClusterConfig()
{
- return conf.sidecarInstances();
+ return conf.sidecarContactPoints();
}
public SidecarClient getSidecarClient()
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index a7578b0..72a310a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -26,7 +26,11 @@ import
org.apache.cassandra.spark.transports.storage.extensions.StorageTransport
*/
public enum WriterOptions implements WriterOption
{
+ @Deprecated // Prefer the equivalent, SIDECAR_CONTACT_POINTS
SIDECAR_INSTANCES,
+ // The option specifies the initial contact points of sidecar servers to
discover the cluster topology
+ // Note that the addresses can include port; when port is present, it
takes precedence over SIDECAR_PORT
+ SIDECAR_CONTACT_POINTS,
KEYSPACE,
TABLE,
BULK_WRITER_CL,
@@ -45,7 +49,7 @@ public enum WriterOptions implements WriterOption
TRUSTSTORE_PATH,
TRUSTSTORE_BASE64_ENCODED,
SIDECAR_PORT,
- @Deprecated // the size unit `MB` is incorrect, use
`SSTABLE_DATA_SIZE_IN_MIB` instead
+ @Deprecated // Prefer the equivalent, `SSTABLE_DATA_SIZE_IN_MIB`. The size
unit `MB` is incorrect and internally treated as MiB.
SSTABLE_DATA_SIZE_IN_MB,
SSTABLE_DATA_SIZE_IN_MIB,
TTL,
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
new file mode 100644
index 0000000..cc6fb09
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+
+public class SidecarInstanceFactory
+{
+ private SidecarInstanceFactory()
+ {
+ throw new UnsupportedOperationException("Utility class");
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarInstanceFactory.class);
+
+ /**
+ * Create SidecarInstance object by parsing the input string, which is IP
address or hostname and optionally includes port
+ * @param input hostname string that can optionally includes the port. If
port is present, the defaultPort param is ignored.
+ * @param defaultPort port value used when the input string contains no
port
+ * @return SidecarInstanceImpl
+ */
+ public static SidecarInstanceImpl createFromString(String input, int
defaultPort)
+ {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(input), "Unable to
create sidecar instance from empty input");
+
+ String hostname = input;
+ int port = defaultPort;
+ // has port in the string. The former matches ipv6 and the latter
matches ipv4 and hostnames
+ // ipv6 with port example: [2024:a::1]:8080
+ if (input.contains("]:") || (!input.startsWith("[") &&
input.contains(":")))
+ {
+ int index = input.lastIndexOf(':');
+ hostname = input.substring(0, index); // includes ']' if it is ipv6
+ String portStr = input.substring(index + 1);
+ port = Integer.parseInt(portStr);
+ }
+
+ LOGGER.info("Create sidecar instance. hostname={} port={}", hostname,
port);
+ return new SidecarInstanceImpl(hostname, port);
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 8154b01..114b5dd 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -51,6 +51,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,6 +76,7 @@ import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
import org.apache.cassandra.spark.config.SchemaFeature;
import org.apache.cassandra.spark.config.SchemaFeatureSet;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
@@ -154,7 +156,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
this.table = options.table();
this.quoteIdentifiers = options.quoteIdentifiers();
this.sidecarClientConfig = sidecarClientConfig;
- this.sidecarInstances = options.sidecarInstances;
+ this.sidecarInstances = options.sidecarContactPoints;
this.sidecarPort = options.sidecarPort;
this.sslConfig = sslConfig;
this.bigNumberConfigMap = options.bigNumberConfigMap();
@@ -899,14 +901,15 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
protected Set<? extends SidecarInstance>
initializeClusterConfig(ClientConfig options)
{
- return initializeClusterConfig(options.sidecarInstances,
options.sidecarPort());
+ return initializeClusterConfig(options.sidecarContactPoints,
options.sidecarPort());
}
// not intended to be overridden
private Set<? extends SidecarInstance> initializeClusterConfig(String
sidecarInstances, int sidecarPort)
{
return Arrays.stream(sidecarInstances.split(","))
- .map(hostname -> new SidecarInstanceImpl(hostname,
sidecarPort))
+ .filter(StringUtils::isNotEmpty)
+ .map(hostname ->
SidecarInstanceFactory.createFromString(hostname, sidecarPort))
.collect(Collectors.toSet());
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
index dd9675c..32d926a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
@@ -43,7 +43,11 @@ public class ClientConfig
{
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
- public static final String SIDECAR_INSTANCES = "sidecar_instances";
+ // TODO: the key value string is not consistent with WriterOptions and
keys in BulkSparkConf
+ @Deprecated // Prefer the equivalent, SIDECAR_CONTACT_POINTS
+ public static final String SIDECAR_INSTANCES = "sidecar_contact_points";
+ // The option specifies the initial contact points of sidecar servers to
discover the cluster topology
+ public static final String SIDECAR_CONTACT_POINTS =
"sidecar_contact_points";
public static final String KEYSPACE_KEY = "keyspace";
public static final String TABLE_KEY = "table";
public static final String SNAPSHOT_NAME_KEY = "snapshotName";
@@ -78,7 +82,7 @@ public class ClientConfig
public static final String QUOTE_IDENTIFIERS = "quote_identifiers";
public static final int DEFAULT_SIDECAR_PORT = 9043;
- protected String sidecarInstances;
+ protected String sidecarContactPoints;
@Nullable
protected String keyspace;
@Nullable
@@ -105,7 +109,7 @@ public class ClientConfig
protected ClientConfig(Map<String, String> options)
{
- this.sidecarInstances = parseSidecarInstances(options);
+ this.sidecarContactPoints = parseSidecarContactPoints(options);
this.keyspace = MapUtils.getOrThrow(options, KEYSPACE_KEY, "keyspace");
this.table = MapUtils.getOrThrow(options, TABLE_KEY, "table");
this.snapshotName = MapUtils.getOrDefault(options, SNAPSHOT_NAME_KEY,
"sbr_" + UUID.randomUUID().toString().replace("-", ""));
@@ -135,9 +139,16 @@ public class ClientConfig
this.quoteIdentifiers = MapUtils.getBoolean(options,
QUOTE_IDENTIFIERS, false);
}
- protected String parseSidecarInstances(Map<String, String> options)
+ protected String parseSidecarContactPoints(Map<String, String> options)
{
- return MapUtils.getOrThrow(options, SIDECAR_INSTANCES,
"sidecar_instances");
+ return MapUtils.resolveDeprecated(options, SIDECAR_CONTACT_POINTS,
SIDECAR_INSTANCES, option -> {
+ if (option == null)
+ {
+ throw MapUtils.throwable(SIDECAR_CONTACT_POINTS + " or " +
SIDECAR_INSTANCES).get();
+ }
+
+ return MapUtils.getOrThrow(options, option);
+ });
}
protected ClearSnapshotStrategy parseClearSnapshotStrategy(boolean
hasDeprecatedOption,
@@ -155,9 +166,9 @@ public class ClientConfig
return ClearSnapshotStrategy.parse(clearSnapshotStrategyOption);
}
- public String sidecarInstances()
+ public String sidecarContactPoints()
{
- return sidecarInstances;
+ return sidecarContactPoints;
}
public String keyspace()
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index b059479..d48ace8 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -35,23 +35,18 @@ import org.jetbrains.annotations.NotNull;
import static
org.apache.cassandra.spark.bulkwriter.BulkSparkConf.DEFAULT_SIDECAR_PORT;
import static
org.apache.cassandra.spark.bulkwriter.BulkSparkConf.MINIMUM_JOB_KEEP_ALIVE_MINUTES;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.emptyString;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class BulkSparkConfTest
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class BulkSparkConfTest
{
private SparkConf sparkConf;
private BulkSparkConf bulkSparkConf;
private Map<String, String> defaultOptions;
@BeforeEach
- public void before()
+ void before()
{
sparkConf = new SparkConf();
defaultOptions = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
@@ -64,28 +59,28 @@ public class BulkSparkConfTest
}
@Test
- public void testGetBoolean()
+ void testGetBoolean()
{
sparkConf.set("spark.cassandra_analytics.job.skip_clean", "true");
- assertThat(bulkSparkConf.getSkipClean(), is(true));
+ assertThat(bulkSparkConf.getSkipClean()).isTrue();
}
@Test
- public void testGetLong()
+ void testGetLong()
{
sparkConf.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds",
"2222");
- assertThat(bulkSparkConf.getSidecarRequestRetryDelayMillis(),
is(2222L));
+
assertThat(bulkSparkConf.getSidecarRequestRetryDelayMillis()).isEqualTo(2222L);
}
@Test
- public void testGetInt()
+ void testGetInt()
{
sparkConf.set("spark.cassandra_analytics.request.max_connections",
"1234");
- assertThat(bulkSparkConf.getMaxHttpConnections(), is(1234));
+ assertThat(bulkSparkConf.getMaxHttpConnections()).isEqualTo(1234);
}
@Test
- public void deprecatedSettingsAreHonored()
+ void deprecatedSettingsAreHonored()
{
// Test that deprecated names of settings are in fact picked up
correctly
@@ -102,155 +97,182 @@ public class BulkSparkConfTest
}
};
- assertThat(bulkSparkConf.getMaxHttpConnections(), is(1234));
- assertThat(bulkSparkConf.getHttpResponseTimeoutMs(), is(5678));
+ assertThat(bulkSparkConf.getMaxHttpConnections()).isEqualTo(1234);
+ assertThat(bulkSparkConf.getHttpResponseTimeoutMs()).isEqualTo(5678);
}
@Test
- public void calculatesCoresCorrectlyForStaticAllocation()
+ void calculatesCoresCorrectlyForStaticAllocation()
{
sparkConf.set("spark.executor.cores", "5");
sparkConf.set("spark.executor.instances", "5");
- assertThat(bulkSparkConf.getCores(), is(25));
+ assertThat(bulkSparkConf.getCores()).isEqualTo(25);
}
@Test
- public void calculatesCoresCorrectlyForDynamicAllocation()
+ void calculatesCoresCorrectlyForDynamicAllocation()
{
sparkConf.set("spark.executor.cores", "6");
sparkConf.set("spark.dynamicAllocation.maxExecutors", "7");
- assertThat(bulkSparkConf.getCores(), is(42));
+ assertThat(bulkSparkConf.getCores()).isEqualTo(42);
}
@Test
- public void ensureSetupSparkConfAddsPerformsNecessaryTasks()
+ void ensureSetupSparkConfAddsPerformsNecessaryTasks()
{
- assertThat(sparkConf.get("spark.kryo.registrator", ""),
is(emptyString()));
- assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""),
is(emptyString()));
+ assertThat(sparkConf.get("spark.kryo.registrator", "")).isEmpty();
+ assertThat(sparkConf.get("spark.executor.extraJavaOptions",
"")).isEmpty();
BulkSparkConf.setupSparkConf(sparkConf, true);
- assertEquals("," + SbwKryoRegistrator.class.getName(),
sparkConf.get("spark.kryo.registrator", ""));
+ assertThat(sparkConf.get("spark.kryo.registrator", ""))
+ .isEqualTo("," + SbwKryoRegistrator.class.getName());
if (BuildInfo.isAtLeastJava11(BuildInfo.javaSpecificationVersion()))
{
- assertEquals(BulkSparkConf.JDK11_OPTIONS,
sparkConf.get("spark.executor.extraJavaOptions", ""));
+ assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""))
+ .isEqualTo(BulkSparkConf.JDK11_OPTIONS);
}
}
@Test
- public void withProperMtlsSettingsWillCreateSuccessfully()
+ void withProperMtlsSettingsWillCreateSuccessfully()
{
// mTLS is now required, and the BulkSparkConf constructor fails if
the options aren't present
Map<String, String> options = copyDefaultOptions();
SparkConf sparkConf = new SparkConf();
- BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+ assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf,
options));
}
@Test
- public void keystorePathRequiredIfBase64EncodedKeystoreNotSet()
+ void keystorePathRequiredIfBase64EncodedKeystoreNotSet()
{
Map<String, String> options = copyDefaultOptions();
options.remove(WriterOptions.KEYSTORE_PATH.name());
SparkConf sparkConf = new SparkConf();
- NullPointerException npe = assertThrows(NullPointerException.class,
- () -> new
BulkSparkConf(sparkConf, options));
- assertEquals("Keystore password was set. But both keystore path and
base64 encoded string are not set. "
- + "Please either set option " +
WriterOptions.KEYSTORE_PATH
- + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED,
npe.getMessage());
+ assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+ .isExactlyInstanceOf(NullPointerException.class)
+ .hasMessage("Keystore password was set. But both keystore path and
base64 encoded string are not set. "
+ + "Please either set option " + WriterOptions.KEYSTORE_PATH
+ + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED);
}
@Test
- public void testSkipClean()
+ void testSkipClean()
{
- assertFalse(bulkSparkConf.getSkipClean());
+ assertThat(bulkSparkConf.getSkipClean()).isFalse();
sparkConf.set(BulkSparkConf.SKIP_CLEAN, "true");
- assertTrue(bulkSparkConf.getSkipClean());
+ assertThat(bulkSparkConf.getSkipClean()).isTrue();
}
@Test
- public void testDefaultSidecarPort()
+ void testDefaultSidecarPort()
{
bulkSparkConf = new BulkSparkConf(new SparkConf(), defaultOptions);
- assertEquals(-1, bulkSparkConf.getUserProvidedSidecarPort());
- assertEquals(DEFAULT_SIDECAR_PORT,
bulkSparkConf.getEffectiveSidecarPort());
+ assertThat(bulkSparkConf.getUserProvidedSidecarPort()).isEqualTo(-1);
+
assertThat(bulkSparkConf.getEffectiveSidecarPort()).isEqualTo(DEFAULT_SIDECAR_PORT);
}
@Test
- public void testSidecarPortSetByOptions()
+ void testSidecarPortSetByOptions()
{
Map<String, String> options = copyDefaultOptions();
options.put(WriterOptions.SIDECAR_PORT.name(), "9999");
bulkSparkConf = new BulkSparkConf(new SparkConf(), options);
- assertEquals(9999, bulkSparkConf.getUserProvidedSidecarPort());
- assertEquals(9999, bulkSparkConf.getEffectiveSidecarPort());
+ assertThat(bulkSparkConf.getUserProvidedSidecarPort()).isEqualTo(9999);
+ assertThat(bulkSparkConf.getEffectiveSidecarPort()).isEqualTo(9999);
}
@Test
- public void testSidecarPortSetByProperty()
+ void testSidecarPortSetByProperty()
{
// Spark conf loads values from system properties, but we can also
test by calling `.set` explicitly.
// This makes the test not pollute global (System.properties) state
but still tests the same basic path.
SparkConf conf = new SparkConf()
.set(BulkSparkConf.SIDECAR_PORT, "9876");
bulkSparkConf = new BulkSparkConf(conf, defaultOptions);
- assertEquals(9876, bulkSparkConf.getUserProvidedSidecarPort());
- assertEquals(9876, bulkSparkConf.getEffectiveSidecarPort());
+ assertThat(bulkSparkConf.getUserProvidedSidecarPort()).isEqualTo(9876);
+ assertThat(bulkSparkConf.getEffectiveSidecarPort()).isEqualTo(9876);
}
@Test
- public void testKeystoreBase64EncodedStringSet()
+ void testKeystoreBase64EncodedStringSet()
{
Map<String, String> options = copyDefaultOptions();
options.remove(WriterOptions.KEYSTORE_PATH.name());
options.put(WriterOptions.KEYSTORE_BASE64_ENCODED.name(),
"dummy_base64_encoded_keystore");
- bulkSparkConf = new BulkSparkConf(sparkConf, defaultOptions);
+ assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf,
defaultOptions));
}
@Test
- public void testTrustStorePasswordSetPathNotSet()
+ void testTrustStorePasswordSetPathNotSet()
{
Map<String, String> options = copyDefaultOptions();
options.put(WriterOptions.TRUSTSTORE_PATH.name(), "dummy");
- NullPointerException npe = assertThrows(NullPointerException.class,
- () -> new
BulkSparkConf(sparkConf, options));
- assertEquals("Trust Store Path was provided, but password is missing. "
- + "Please provide option " +
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
+ assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+ .isExactlyInstanceOf(NullPointerException.class)
+ .hasMessage("Trust Store Path was provided, but password is missing. "
+ + "Please provide option " +
WriterOptions.TRUSTSTORE_PASSWORD);
}
@Test
void testQuoteIdentifiers()
{
- assertFalse(bulkSparkConf.quoteIdentifiers);
+ assertThat(bulkSparkConf.quoteIdentifiers).isFalse();
Map<String, String> options = copyDefaultOptions();
options.put(WriterOptions.QUOTE_IDENTIFIERS.name(), "true");
BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
- assertNotNull(bulkSparkConf);
- assertTrue(bulkSparkConf.quoteIdentifiers);
+ assertThat(bulkSparkConf).isNotNull();
+ assertThat(bulkSparkConf.quoteIdentifiers).isTrue();
}
@Test
- public void testInvalidJobKeepAliveMinutes()
+ void testInvalidJobKeepAliveMinutes()
{
Map<String, String> options = copyDefaultOptions();
options.put(WriterOptions.JOB_KEEP_ALIVE_MINUTES.name(), "-100");
- IllegalArgumentException iae =
assertThrows(IllegalArgumentException.class, () -> new BulkSparkConf(sparkConf,
options));
- assertEquals("Invalid value for the 'JOB_KEEP_ALIVE_MINUTES' Bulk
Writer option (-100). It cannot be less than the minimum 10",
- iae.getMessage());
+ assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid value for the 'JOB_KEEP_ALIVE_MINUTES' Bulk
Writer option (-100). It cannot be less than the minimum 10");
}
@Test
- public void testDefaultJobKeepAliveMinutes()
+ void testDefaultJobKeepAliveMinutes()
{
Map<String, String> options = copyDefaultOptions();
BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
- assertEquals(MINIMUM_JOB_KEEP_ALIVE_MINUTES,
conf.getJobKeepAliveMinutes());
+
assertThat(conf.getJobKeepAliveMinutes()).isEqualTo(MINIMUM_JOB_KEEP_ALIVE_MINUTES);
}
@Test
- public void testJobKeepAliveMinutes()
+ void testJobKeepAliveMinutes()
{
Map<String, String> options = copyDefaultOptions();
options.put(WriterOptions.JOB_KEEP_ALIVE_MINUTES.name(), "30");
BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
- assertEquals(30, conf.getJobKeepAliveMinutes());
+ assertThat(conf.getJobKeepAliveMinutes()).isEqualTo(30);
+ }
+
+ @Test
+ void testSidecarContactPoints()
+ {
+ Map<String, String> options = copyDefaultOptions();
+
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo("127.0.0.1");
+
+ String sidecarInstances = "127.0.0.1,128.0.0.2";
+ options.put(WriterOptions.SIDECAR_INSTANCES.name(), sidecarInstances);
+
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo(sidecarInstances);
+
+ String contactPoints = "localhost1,localhost2";
+ options.put(WriterOptions.SIDECAR_CONTACT_POINTS.name(),
contactPoints);
+
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo(contactPoints);
+
+ String contactPointsWithPort = "localhost1:9999,localhost2:9999";
+ options.put(WriterOptions.SIDECAR_CONTACT_POINTS.name(),
contactPointsWithPort);
+
assertThat(BulkSparkConf.resolveSidecarContactPoints(options)).isEqualTo(contactPointsWithPort);
+
+ options.remove(WriterOptions.SIDECAR_INSTANCES.name());
+ options.remove(WriterOptions.SIDECAR_CONTACT_POINTS.name());
+ assertThat(BulkSparkConf.resolveSidecarContactPoints(options))
+ .describedAs("When none of the sidecar options are define. It resolves
to the default value null")
+ .isNull();
}
private Map<String, String> copyDefaultOptions()
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
index 6baf693..cf826b7 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
@@ -36,7 +36,7 @@ class CassandraDataLayerTests
public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
ImmutableMap.of(
"keyspace", "big-data",
"table", "customers",
- "sidecar_instances", "localhost");
+ "sidecar_contact_points", "localhost");
@Test
void testDefaultClearSnapshotStrategy()
@@ -45,7 +45,7 @@ class CassandraDataLayerTests
ClientConfig clientConfig = ClientConfig.create(options);
assertEquals("big-data", clientConfig.keyspace());
assertEquals("customers", clientConfig.table());
- assertEquals("localhost", clientConfig.sidecarInstances());
+ assertEquals("localhost", clientConfig.sidecarContactPoints());
ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy =
clientConfig.clearSnapshotStrategy();
assertTrue(clearSnapshotStrategy.shouldClearOnCompletion());
assertEquals("2d", clearSnapshotStrategy.ttl());
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
index 71e2758..208f697 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CassandraDataSourceHelperCacheTest
{
public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
- ImmutableMap.of("sidecar_instances", "127.0.0.1",
+ ImmutableMap.of("sidecar_contact_points", "127.0.0.1",
"keyspace", "big-data",
"table", "customers");
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
index 7e82c91..f9f6b61 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
@@ -36,7 +36,7 @@ class ClientConfigTests
public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
ImmutableMap.of(
"keyspace", "big-data",
"table", "customers",
- "sidecar_instances", "localhost");
+ "sidecar_contact_points", "localhost");
@ParameterizedTest
@ValueSource(strings = {"2h", "200s", "4d", "60m", " 60m", "50s ", " 32d
"})
diff --git
a/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/common/SidecarInstanceFactoryTest.java
b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/common/SidecarInstanceFactoryTest.java
new file mode 100644
index 0000000..719f5d4
--- /dev/null
+++
b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/common/SidecarInstanceFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class SidecarInstanceFactoryTest
+{
+ @Test
+ void testCreateSidecarInstance()
+ {
+ assertThatThrownBy(() -> SidecarInstanceFactory.createFromString("",
9999))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Unable to create sidecar instance from empty input");
+
+
assertSidecarInstance(SidecarInstanceFactory.createFromString("localhost",
9999),
+ "localhost", 9999);
+
assertSidecarInstance(SidecarInstanceFactory.createFromString("[2024:a::1]",
9999),
+ "[2024:a::1]", 9999);
+
assertSidecarInstance(SidecarInstanceFactory.createFromString("localhost:8888",
9999),
+ "localhost", 8888);
+
assertSidecarInstance(SidecarInstanceFactory.createFromString("127.0.0.1:8888",
9999),
+ "127.0.0.1", 8888);
+
assertSidecarInstance(SidecarInstanceFactory.createFromString("[2024:a::1]:8888",
9999),
+ "[2024:a::1]", 8888);
+ }
+
+ private void assertSidecarInstance(SidecarInstance sidecarInstance, String
expectedHostname, int expectedPort)
+ {
+ assertThat(sidecarInstance.hostname()).isEqualTo(expectedHostname);
+ assertThat(sidecarInstance.port()).isEqualTo(expectedPort);
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index de14a2b..9628bed 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -116,7 +116,7 @@ public class SparkTestUtils
int numCores = coresPerExecutor * numExecutors;
return
sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
- .option("sidecar_instances", sidecarInstancesOption(cluster,
dnsResolver))
+ .option("sidecar_contact_points",
sidecarInstancesOption(cluster, dnsResolver))
.option("keyspace", tableName.keyspace()) // unquoted
.option("table", tableName.table()) // unquoted
.option("DC", "datacenter1")
@@ -147,7 +147,7 @@ public class SparkTestUtils
{
return df.write()
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
- .option("sidecar_instances", sidecarInstancesOption(cluster,
dnsResolver))
+ .option("sidecar_contact_points",
sidecarInstancesOption(cluster, dnsResolver))
.option("keyspace", tableName.keyspace())
.option("table", tableName.table())
.option("local_dc", "datacenter1")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]