This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 67281b3 CASSANDRA-20078: Add integration tests for multiple types
during bulk writes (#94)
67281b3 is described below
commit 67281b31010791fa7f0d02dd0f776862e15846d3
Author: Francisco Guerrero <[email protected]>
AuthorDate: Thu Nov 14 21:49:15 2024 -0800
CASSANDRA-20078: Add integration tests for multiple types during bulk
writes (#94)
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-20078
---
build.gradle | 1 -
cassandra-analytics-core/build.gradle | 2 +-
.../spark/bulkwriter/SqlToCqlTypeConverter.java | 12 +-
.../testing/SharedClusterIntegrationTestBase.java | 2 +-
.../org/apache/cassandra/testing/TestUtils.java | 2 +-
.../analytics/BulkWriteDataTypesTest.java | 495 +++++++++++++++++++++
.../apache/cassandra/analytics/SparkTestUtils.java | 1 +
7 files changed, 505 insertions(+), 10 deletions(-)
diff --git a/build.gradle b/build.gradle
index 20312ae..e788b2b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,7 +54,6 @@ if (!actualJdkVersion.startsWith(ext.jdkLabel))
ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?:
"${rootDir}/dependencies") + "/"
-
def buildDir = layout.buildDirectory.get().asFile.toPath()
def ratExcludeFilePath = buildDir.resolve(".rat-excludes.txt")
System.out.println("Rat exclude file:" + ratExcludeFilePath)
diff --git a/cassandra-analytics-core/build.gradle
b/cassandra-analytics-core/build.gradle
index 65685e4..1d0dc5c 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -69,7 +69,7 @@ dependencies {
fourzerobridge(project(path: ':cassandra-four-zero-bridge'))
fourzerotypes(project(path: ':cassandra-four-zero-types'))
fourzerosparksql(project(path:
':cassandra-analytics-spark-four-zero-converter'))
- implementation(project(':cassandra-analytics-spark-converter'))
+ api(project(':cassandra-analytics-spark-converter'))
// Including newer JNA which works with "Modern" Linux/GLIBC.
// End users can exclude this and include their own if necessary.
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
index a80c8c6..d148de7 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
@@ -85,15 +85,15 @@ public final class SqlToCqlTypeConverter implements
Serializable
private static final BytesConverter BYTES_CONVERTER = new BytesConverter();
private static final BigDecimalConverter BIG_DECIMAL_CONVERTER = new
BigDecimalConverter();
private static final IntegerConverter INTEGER_CONVERTER = new
IntegerConverter();
- private static final TimestampConverter TIMESTAMP_CONVERTER = new
TimestampConverter();
+ public static final TimestampConverter TIMESTAMP_CONVERTER = new
TimestampConverter();
private static final MicroSecondsTimestampConverter
MICROSECONDS_TIMESTAMP_CONVERTER =
new MicroSecondsTimestampConverter();
- private static final TimeConverter TIME_CONVERTER = new TimeConverter();
+ public static final TimeConverter TIME_CONVERTER = new TimeConverter();
private static final UUIDConverter UUID_CONVERTER = new UUIDConverter();
private static final BigIntegerConverter BIG_INTEGER_CONVERTER = new
BigIntegerConverter();
private static final TimeUUIDConverter TIME_UUID_CONVERTER = new
TimeUUIDConverter();
private static final InetAddressConverter INET_ADDRESS_CONVERTER = new
InetAddressConverter();
- private static final DateConverter DATE_CONVERTER = new DateConverter();
+ public static final DateConverter DATE_CONVERTER = new DateConverter();
private SqlToCqlTypeConverter()
{
@@ -469,7 +469,7 @@ public final class SqlToCqlTypeConverter implements
Serializable
}
}
- static class TimestampConverter extends NullableConverter<Date>
+ public static class TimestampConverter extends NullableConverter<Date>
{
/**
* Returns a Date representing the number of milliseconds since the
standard base time known as the epoch
@@ -505,7 +505,7 @@ public final class SqlToCqlTypeConverter implements
Serializable
}
}
- static class DateConverter extends NullableConverter<Integer>
+ public static class DateConverter extends NullableConverter<Integer>
{
@Override
public String toString()
@@ -547,7 +547,7 @@ public final class SqlToCqlTypeConverter implements
Serializable
}
}
- static class TimeConverter extends NullableConverter<Long>
+ public static class TimeConverter extends NullableConverter<Long>
{
@Override
@SuppressWarnings("deprecation")
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index ea5b59d..ffd33d4 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -174,7 +174,7 @@ public abstract class SharedClusterIntegrationTestBase
logger.info("Testing with version={}", testVersion);
classLoaderWrapper = new IsolatedDTestClassLoaderWrapper();
- classLoaderWrapper.initializeDTestJarClassLoader(testVersion,
SharedClusterIntegrationTestBase.class);
+ classLoaderWrapper.initializeDTestJarClassLoader(testVersion,
TestVersion.class);
beforeClusterProvisioning();
cluster = provisionClusterWithRetries(this.testVersion);
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
index 3527145..e98b5f3 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
@@ -109,7 +109,7 @@ public final class TestUtils
System.setProperty("cassandra.consistent.simultaneousmoves.allow",
"true");
// End gossip delay settings
// Set the location of dtest jars
- System.setProperty("cassandra.test.dtest_jar_path",
System.getProperty("cassandra.test.dtest_jar_path", "dtest-jars"));
+ System.setProperty("cassandra.test.dtest_jar_path",
System.getProperty("cassandra.test.dtest_jar_path", "dependencies"));
// Disable tcnative in netty as it can cause jni issues and logs lots
errors
System.setProperty("cassandra.disable_tcactive_openssl", "true");
// As we enable gossip by default, make the checks happen faster
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
new file mode 100644
index 0000000..c63accd
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.ScalaConversionUtils;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import scala.collection.mutable.Seq;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.DateType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
+import static org.apache.spark.sql.types.DataTypes.createMapType;
+import static org.assertj.core.api.Assertions.assertThatException;
+
+/**
+ * Tests bulk writes with different data types. More types can be added to the
test
+ * as we support more types.
+ */
+class BulkWriteDataTypesTest extends SharedClusterSparkIntegrationTestBase
+{
+ @ParameterizedTest(name = "{index} => {0}")
+ @MethodSource("testArguments")
+ void testType(String tableName, TypeTestSetup typeTestSetup)
+ {
+ SparkSession spark = getOrCreateSparkSession();
+
+ Dataset<Row> df = generateDataset(spark, typeTestSetup);
+
+ QualifiedName table = new QualifiedName(TEST_KEYSPACE, tableName);
+ if (typeTestSetup.expectedFailureMessage != null)
+ {
+ assertThatException()
+ .isThrownBy(() -> bulkWriterDataFrameWriter(df, table).save())
+ .withMessageContaining(typeTestSetup.expectedFailureMessage);
+ }
+ else
+ {
+ bulkWriterDataFrameWriter(df, table).save();
+ sparkTestUtils.validateWrites(df.collectAsList(),
queryAllData(table),
+
typeTestSetup.columnMapperValidation, typeTestSetup.rowMapperValidation);
+ }
+ }
+
+ Dataset<Row> generateDataset(SparkSession spark, TypeTestSetup
typeTestSetup)
+ {
+ StructType schema = new StructType();
+ for (int i = 0; i < typeTestSetup.columns.size(); i++)
+ {
+ schema = schema.add(typeTestSetup.columns.get(i),
typeTestSetup.columnTypes.get(i), false);
+ }
+
+ List<Row> rows = IntStream.range(0, typeTestSetup.numRows)
+ .mapToObj(recordNum -> {
+ List<Object> values = new
ArrayList<>(typeTestSetup.columns.size());
+ for (Function<Integer, Object> fn :
typeTestSetup.valueFunction)
+ {
+ values.add(fn.apply(recordNum));
+ }
+ return
RowFactory.create(values.toArray());
+ }).collect(Collectors.toList());
+ return spark.createDataFrame(rows, schema);
+ }
+
+ /**
+ * Initialize required schemas for the tests upfront before the test starts
+ */
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+ typesToTest().forEach(typeTestSetup -> {
+ QualifiedName tableName = new QualifiedName(TEST_KEYSPACE,
typeTestSetup.tableName);
+ createTestTable(tableName, typeTestSetup.createTableSchema);
+ });
+ }
+
+ /**
+ * @return cartesian product of the list of consistency levels and
instance down
+ */
+ static Stream<Arguments> testArguments()
+ {
+ return typesToTest().stream().map(typeTestSetup ->
Arguments.of(typeTestSetup.tableName, typeTestSetup));
+ }
+
+ static List<TypeTestSetup> typesToTest()
+ {
+ List<TypeTestSetup> types = new ArrayList<>();
+
+ // Simple schema with a bigint column.
+ types.add(simpleBigIntSchemaSetup());
+
+ // Simple schema with Date as column.
+ types.add(simpleDateSchemaSetup());
+
+ // Simple schema with integers and strings as columns.
+ types.add(integersAndStringsSchemaSetup());
+
+ // Simple schema with TimeUUID as column.
+ types.add(timeUUIDSchemaSetup());
+
+ // Simple schema with TimeUUID as column, attempt to populate with
random UUID.
+ types.add(randomUUIDFailureSchemaSetup());
+
+ // Schema with byte array as columns.
+ types.add(byteArrayColumnSetup());
+
+ // Schema with List of int as columns.
+ types.add(intListSchemaSetup());
+
+ // Schema with List of bytes as column.
+ types.add(byteListSchemaSetup());
+
+ // Schema with set of lists as column.
+ types.add(stringSetSchemaSetup());
+
+ // Schema with Map of bytes as column
+ // NOTE: Also testing CASSANDRA-17623 (map serialization) with this
test, thus the primary key of id and mapdata
+ types.add(mapByteSchemaSetup());
+
+ // Schema with nested datatypes as column
+ types.add(mapListSchemaSetup());
+
+ // Schema with nested datatypes as column
+ types.add(nestedDataTypesSchemaSetup());
+
+ // Time with Timestamp as source.
+ // Note that Spark only supports milliseconds in its timestamp column,
+ // so we only test to millisecond precision here
+ types.add(timeSchemaTimestampSetup());
+
+ // Time with Long as source.
+ types.add(timeWithLongSourceSchemaSetup());
+
+ // Old tables (created in C* 1.2) with Timestamp columns from 1.2 were
changed in 2.0 to return CUSTOM(DATE).
+ // The SBW handles these by detecting CUSTOM columns with a date type
and internally treating them like timestamps.
+ // Note that Spark only supports milliseconds in its timestamp column,
so we only test to millisecond precision here
+ types.add(customDateTypeSchemaSetup());
+
+ return types;
+ }
+
+ static TypeTestSetup simpleBigIntSchemaSetup()
+ {
+ return new TypeTestSetup("bigint_schema",
+ Arrays.asList("id", "marks"),
+ Arrays.asList(IntegerType, LongType),
+ Arrays.asList(INTEGER_MAPPER, LONG_MAPPER),
+ "CREATE TABLE %s (id int, marks bigint,
PRIMARY KEY (id))");
+ }
+
+ static TypeTestSetup simpleDateSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("date_schema",
+ Arrays.asList("id", "course"),
+ Arrays.asList(IntegerType,
DateType),
+ Arrays.asList(INTEGER_MAPPER,
DATE_MAPPER),
+ "CREATE TABLE %s (id int,
course date, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> String.format("%s:%s", row.get(0),
SqlToCqlTypeConverter.DATE_CONVERTER.convertInternal(row.get(1)));
+ return setup;
+ }
+
+ static TypeTestSetup integersAndStringsSchemaSetup()
+ {
+ return new TypeTestSetup("simple_schema",
+ Arrays.asList("id", "course", "marks"),
+ Arrays.asList(IntegerType, StringType,
IntegerType),
+ Arrays.asList(INTEGER_MAPPER, STRING_MAPPER,
INTEGER_MAPPER),
+ "CREATE TABLE %s (id int, course text, marks
int, PRIMARY KEY (id))");
+ }
+
+ static TypeTestSetup timeUUIDSchemaSetup()
+ {
+ return new TypeTestSetup("timeuuid_schema",
+ Arrays.asList("id", "course"),
+ Arrays.asList(IntegerType, StringType),
+ Arrays.asList(INTEGER_MAPPER,
TIME_UUID_MAPPER),
+ "CREATE TABLE %s (id int, course timeuuid,
PRIMARY KEY (id))");
+ }
+
+ static TypeTestSetup randomUUIDFailureSchemaSetup()
+ {
+ return new TypeTestSetup("timeuuid_schema_bad_uuid",
+ Arrays.asList("id", "course"),
+ Arrays.asList(IntegerType, StringType),
+ Arrays.asList(INTEGER_MAPPER,
RANDOM_UUID_MAPPER),
+ "CREATE TABLE %s (id int, course timeuuid,
PRIMARY KEY (id))",
+ "Bulk Write to Cassandra has failed");
+ }
+
+ static TypeTestSetup byteArrayColumnSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("bytearray_column",
+ Arrays.asList("id",
"binarydata"),
+ Arrays.asList(IntegerType,
BinaryType),
+ Arrays.asList(INTEGER_MAPPER,
BINARY_MAPPER),
+ "CREATE TABLE %s (id int,
binarydata blob, PRIMARY KEY (id))");
+ setup.rowMapperValidation =
+ row -> String.format("%s:%s", row.get(0), new String((byte[])
row.get(1), StandardCharsets.UTF_8));
+ setup.columnMapperValidation = columns -> {
+ Object col1 = new String(ByteBufferUtils.getArray((ByteBuffer)
columns[1]), StandardCharsets.UTF_8);
+ return String.format("%s:%s", columns[0], col1);
+ };
+ return setup;
+ }
+
+ static TypeTestSetup intListSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("list_column",
+ Arrays.asList("id",
"listdata"),
+ Arrays.asList(IntegerType,
createArrayType(IntegerType)),
+ Arrays.asList(INTEGER_MAPPER,
INTEGER_ARRAY_MAPPER),
+ "CREATE TABLE %s (id int,
listdata LIST<int>, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> String.format("%s:%s", row.get(0),
row.getList(1));
+ return setup;
+ }
+
+ @SuppressWarnings("unchecked")
+ static TypeTestSetup byteListSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("byte_list_column",
+ Arrays.asList("id",
"listdata"),
+ Arrays.asList(IntegerType,
createArrayType(BinaryType)),
+ Arrays.asList(INTEGER_MAPPER,
BYTE_ARRAY_MAPPER),
+ "CREATE TABLE %s (id int,
listdata LIST<blob>, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> {
+ List<byte[]> byteList = row.getList(1);
+ return String.format("%s:%s", row.get(0), byteList.stream()
+ .map(b -> new
String(b, StandardCharsets.UTF_8))
+
.collect(Collectors.toList()));
+ };
+ setup.columnMapperValidation = columns -> {
+ List<ByteBuffer> byteBufferList = (List<ByteBuffer>) columns[1];
+ return String.format("%s:%s", columns[0], byteBufferList.stream()
+ .map(b ->
new String(ByteBufferUtils.getArray(b), StandardCharsets.UTF_8))
+
.collect(Collectors.toList()));
+ };
+ return setup;
+ }
+
+ static TypeTestSetup stringSetSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("set_list_column",
+ Arrays.asList("id", "setdata"),
+ Arrays.asList(IntegerType,
createArrayType(StringType)),
+ Arrays.asList(INTEGER_MAPPER,
STRING_SET_MAPPER),
+ "CREATE TABLE %s (id int,
setdata set<text>, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> String.format("%s:%s", row.get(0),
row.getList(1));
+ return setup;
+ }
+
+ @SuppressWarnings("unchecked")
+ static TypeTestSetup mapByteSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("map_byte_column",
+ Arrays.asList("id", "mapdata"),
+ Arrays.asList(IntegerType,
createMapType(StringType, BinaryType)),
+ Arrays.asList(INTEGER_MAPPER,
STRING_BINARY_MAP_MAPPER),
+ "CREATE TABLE %s (id int,
mapdata frozen<map<text,blob>>, PRIMARY KEY (id, mapdata))");
+ setup.rowMapperValidation = row -> {
+ Map<?, ?> map = row.getJavaMap(1);
+ String value = map.entrySet().stream()
+ .map(entry -> String.format("%s=%s",
entry.getKey(),
+ new String((byte[])
entry.getValue(), StandardCharsets.UTF_8)))
+ .collect(Collectors.joining(", ", "[", "]"));
+ return String.format("%s:%s", row.get(0), value);
+ };
+ setup.columnMapperValidation = columns -> {
+ Map<String, ByteBuffer> map = (Map<String, ByteBuffer>) columns[1];
+ String value = map.entrySet().stream()
+ .map(entry -> String.format("%s=%s",
entry.getKey(),
+ new
String(ByteBufferUtils.getArray(entry.getValue()), StandardCharsets.UTF_8)))
+ .collect(Collectors.joining(", ", "[", "]"));
+ return String.format("%s:%s", columns[0], value);
+ };
+ return setup;
+ }
+
+ @SuppressWarnings("unchecked")
+ static TypeTestSetup mapListSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("map_list_column",
+ Arrays.asList("id", "mapdata"),
+ Arrays.asList(IntegerType,
createMapType(StringType, createArrayType(IntegerType))),
+ Arrays.asList(INTEGER_MAPPER,
STRING_ARRAY_INTEGER_MAP_MAPPER),
+ "CREATE TABLE %s (id int,
mapdata map<text,frozen<list<int>>>, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> {
+ Map<?, ?> map = row.getJavaMap(1);
+ Map<String, List<Integer>> value = map.entrySet()
+ .stream()
+
.sorted(Comparator.comparing(e -> (String) e.getKey()))
+ .collect(Collectors.toMap(e
-> (String) e.getKey(),
+ e
-> ScalaConversionUtils.mutableSeqAsJavaList((Seq<Integer>) e.getValue()),
+
(x, y) -> y,
+
LinkedHashMap::new));
+ return String.format("%s:%s", row.get(0), value);
+ };
+ return setup;
+ }
+
+ @SuppressWarnings("unchecked")
+ static TypeTestSetup nestedDataTypesSchemaSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("map_list_byte_column",
+ Arrays.asList("id", "mapdata"),
+ Arrays.asList(IntegerType,
createMapType(StringType, createArrayType(BinaryType))),
+ Arrays.asList(INTEGER_MAPPER,
STRING_ARRAY_BINARY_MAP_MAPPER),
+ "CREATE TABLE %s (id int,
mapdata map<text,frozen<list<blob>>>, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> {
+ Map<String, Seq<byte[]>> map = row.getJavaMap(1);
+ Function<byte[], String> unwrapBytes = b -> new String(b,
StandardCharsets.UTF_8);
+ String value = map.entrySet().stream()
+ .map(entry -> String.format("%s=%s",
entry.getKey(),
+
ScalaConversionUtils.mutableSeqAsJavaList(entry.getValue())
+
.stream()
+
.map(unwrapBytes)
+
.collect(Collectors.toList())))
+ .collect(Collectors.joining(", ", "[", "]"));
+ return String.format("%s:%s", row.get(0), value);
+ };
+ setup.columnMapperValidation = columns -> {
+ Map<String, List<ByteBuffer>> map = (Map<String,
List<ByteBuffer>>) columns[1];
+ Function<ByteBuffer, String> unwrapBytes = b -> new
String(ByteBufferUtils.getArray(b), StandardCharsets.UTF_8);
+ String value = map.entrySet().stream()
+ .map(entry -> String.format("%s=%s",
entry.getKey(),
entry.getValue().stream().map(unwrapBytes).collect(Collectors.toList())))
+ .collect(Collectors.joining(", ", "[", "]"));
+ return String.format("%s:%s", columns[0], value);
+ };
+ return setup;
+ }
+
+ static TypeTestSetup timeSchemaTimestampSetup()
+ {
+ TypeTestSetup setup = new TypeTestSetup("time_schema_timestamp",
+ Arrays.asList("id", "course"),
+ Arrays.asList(IntegerType,
TimestampType),
+ Arrays.asList(INTEGER_MAPPER,
TIMESTAMP_MAPPER),
+ "CREATE TABLE %s (id int,
course time, PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> String.format("%s:%s", row.get(0),
SqlToCqlTypeConverter.TIME_CONVERTER.convertInternal(row.get(1)));
+
+ return setup;
+ }
+
+ static TypeTestSetup timeWithLongSourceSchemaSetup()
+ {
+ return new TypeTestSetup("time_schema_long",
+ Arrays.asList("id", "course"),
+ Arrays.asList(IntegerType, LongType),
+ Arrays.asList(INTEGER_MAPPER, LONG_MAPPER),
+ "CREATE TABLE %s (id int, course time,
PRIMARY KEY (id))");
+ }
+
+ static TypeTestSetup customDateTypeSchemaSetup()
+ {
+ TypeTestSetup setup = new
TypeTestSetup("c_12_ts_as_custom_schema_timestamp",
+ Arrays.asList("id", "course"),
+ Arrays.asList(IntegerType,
LongType),
+ Arrays.asList(INTEGER_MAPPER,
LONG_MAPPER),
+ "CREATE TABLE %s (id int,
course 'org.apache.cassandra.db.marshal.DateType', PRIMARY KEY (id))");
+ setup.rowMapperValidation = row -> String.format("%s:%s", row.get(0),
SqlToCqlTypeConverter.TIMESTAMP_CONVERTER.convertInternal(row.get(1)));
+ return setup;
+ }
+
+ static final Function<Integer, Object> INTEGER_MAPPER = recordNumber ->
recordNumber;
+ static final Function<Integer, Object> INTEGER_ARRAY_MAPPER = recordNumber
-> Arrays.asList(recordNumber, recordNumber, recordNumber);
+ static final Function<Integer, Object> BYTE_ARRAY_MAPPER = recordNumber ->
Arrays.asList(("course" + recordNumber).getBytes(StandardCharsets.UTF_8),
+
("course" + recordNumber).getBytes(StandardCharsets.UTF_8));
+ static final Function<Integer, Object> STRING_SET_MAPPER = recordNumber ->
ImmutableSet.of(String.format("course%06d", recordNumber),
+
String.format("course%06d", recordNumber + 1));
+ static final Function<Integer, Object> STRING_BINARY_MAP_MAPPER
+ = recordNumber -> ImmutableMap.of(String.format("course%06d",
recordNumber), ("course" + recordNumber).getBytes(StandardCharsets.UTF_8),
+ String.format("course%06d", recordNumber
+ 1), ("course" + (recordNumber + 1)).getBytes(StandardCharsets.UTF_8));
+ static final Function<Integer, Object> STRING_ARRAY_INTEGER_MAP_MAPPER
+ = recordNumber -> ImmutableMap.of(String.format("course%06d",
recordNumber), Collections.singletonList(recordNumber),
+ String.format("course%06d", recordNumber
+ 1), Collections.singletonList(recordNumber + 1));
+ static final Function<Integer, Object> STRING_ARRAY_BINARY_MAP_MAPPER
+ = recordNumber -> ImmutableMap.of(String.format("course%06d",
recordNumber),
+ Collections.singletonList(("course" +
recordNumber).getBytes(StandardCharsets.UTF_8)),
+ String.format("course%06d", recordNumber
+ 1),
+ Collections.singletonList(("course" +
(recordNumber + 1)).getBytes(StandardCharsets.UTF_8)));
+ static final Function<Integer, Object> LONG_MAPPER = recordNumber ->
(long) recordNumber;
+ static final Function<Integer, Object> STRING_MAPPER = recordNumber ->
"course" + recordNumber;
+ static final Function<Integer, Object> BINARY_MAPPER = recordNumber ->
("course" + recordNumber).getBytes(StandardCharsets.UTF_8);
+ static final Function<Integer, Object> TIME_UUID_MAPPER = recordNumber ->
UUIDs.timeBased().toString();
+ static final Function<Integer, Object> RANDOM_UUID_MAPPER = recordNumber
-> UUID.randomUUID().toString();
+ static final Function<Integer, Object> TIMESTAMP_MAPPER
+ = recordNumber -> Timestamp.from(new
Date(1731457509115L).toInstant().plus(recordNumber, ChronoUnit.SECONDS));
+ static final Function<Integer, Object> DATE_MAPPER
+ = recordNumber -> java.sql.Date.valueOf(((Timestamp)
TIMESTAMP_MAPPER.apply(recordNumber)).toLocalDateTime().toLocalDate());
+
+ static class TypeTestSetup
+ {
+ final String tableName;
+ final List<String> columns;
+ final List<DataType> columnTypes;
+ final List<Function<Integer, Object>> valueFunction;
+ final String createTableSchema;
+ final String expectedFailureMessage;
+ final int numRows = 10_000;
+ Function<Object[], String> columnMapperValidation =
+ columns -> String.format(String.join(":",
Collections.nCopies(columns.length, "%s")), columns);
+ Function<Row, String> rowMapperValidation = row -> {
+ int size = row.size();
+ Object[] data = new Object[size];
+ for (int i = 0; i < size; i++)
+ {
+ data[i] = row.get(i);
+ }
+ return String.format(String.join(":", Collections.nCopies(size,
"%s")), data);
+ };
+
+ TypeTestSetup(String tableName,
+ List<String> columns,
+ List<DataType> columnTypes,
+ List<Function<Integer, Object>> valueFunction,
+ String createTableSchema)
+ {
+ this.tableName = tableName;
+ this.columns = columns;
+ this.columnTypes = columnTypes;
+ this.valueFunction = valueFunction;
+ this.createTableSchema = createTableSchema;
+ this.expectedFailureMessage = null;
+ }
+
+ TypeTestSetup(String tableName,
+ List<String> columns,
+ List<DataType> columnTypes,
+ List<Function<Integer, Object>> valueFunction,
+ String createTableSchema,
+ String expectedFailureMessage)
+ {
+ this.tableName = tableName;
+ this.columns = columns;
+ this.columnTypes = columnTypes;
+ this.valueFunction = valueFunction;
+ this.createTableSchema = createTableSchema;
+ this.expectedFailureMessage = expectedFailureMessage;
+ }
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 8b69e5c..0e3a136 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -187,6 +187,7 @@ public class SparkTestUtils
// Spark is not case-sensitive by default, but
we want to make it case-sensitive for
// the quoted identifiers tests where we test
mixed case
.set("spark.sql.caseSensitive", "True")
+ .set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.master", "local[8,4]")
.set("spark.cassandra_analytics.sidecar.request.retries", "5")
.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds",
"500")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]