This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new f4014c0 CASSANDRA-19626 Fix NullPointerException when reading static
column with null values (#58)
f4014c0 is described below
commit f4014c06d7668541010d59cc932970e9ebfc36f5
Author: Francisco Guerrero <[email protected]>
AuthorDate: Fri May 10 13:17:04 2024 -0700
CASSANDRA-19626 Fix NullPointerException when reading static column with
null values (#58)
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19626
---
CHANGES.txt | 1 +
.../spark/data/SidecarProvisionedSSTableTest.java | 3 +-
.../distributed/impl/CassandraCluster.java | 6 ++++
.../cassandra/testing/IClusterExtension.java | 9 ++++++
...ifferentTablesTest.java => BulkReaderTest.java} | 37 ++++++++++++++++++++--
.../replacement/HostReplacementTestBase.java | 10 ++----
.../org/apache/cassandra/spark/data/CqlType.java | 3 +-
.../spark/data/complex/CqlCollection.java | 3 +-
scripts/build-sidecar.sh | 2 +-
9 files changed, 60 insertions(+), 14 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c00c881..4c16a54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * NullPointerException when reading static column with null values
(CASSANDRA-19626)
* Integrate with the latest sidecar client (CASSANDRA-19616)
* Support bulk write via S3 (CASSANDRA-19563)
* Support UDTs in the Bulk Writer (CASSANDRA-19340)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index c418d43..4b5ec5e 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -155,8 +155,7 @@ class SidecarProvisionedSSTableTest
1,
snapshot,
keyspace,
-
table,
-
"abc1234",
+
table + "-abc1234",
dataFileName);
return new SidecarProvisionedSSTable(mockSidecarClient,
sidecarClientConfig,
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index f5a5abd..795c758 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -170,6 +170,12 @@ public class CassandraCluster<I extends IInstance>
implements IClusterExtension<
ClusterUtils.awaitRingState(instance, expectedInRing, state);
}
+ @Override
+ public void awaitRingStatus(IInstance instance, IInstance expectedInRing,
String status)
+ {
+ ClusterUtils.awaitRingStatus(instance, expectedInRing, status);
+ }
+
@Override
public void awaitGossipStatus(IInstance instance, IInstance
expectedInGossip, String targetStatus)
{
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
index c604e37..915d289 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
@@ -81,6 +81,15 @@ public interface IClusterExtension<I extends IInstance>
extends ICluster<I>
*/
void awaitRingState(IInstance instance, IInstance expectedInRing, String
state);
+ /**
+ * Wait for the ring to have the target instance with the provided status.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing to look for
+ * @param status expected
+ */
+ void awaitRingStatus(IInstance instance, IInstance expectedInRing, String
status);
+
/**
* Waits for the target instance to have the desired status. Target status
is checked via string contains so works
* with 'NORMAL' but also can check tokens or full state.
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
similarity index 68%
rename from
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
rename to
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
index 88e215d..239af60 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
@@ -38,13 +38,40 @@ import static
org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Test that reads different tables with different schemas within the same test
+ * Tests bulk reader functionality
*/
-class ReadDifferentTablesTest extends SharedClusterSparkIntegrationTestBase
+class BulkReaderTest extends SharedClusterSparkIntegrationTestBase
{
static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e",
"f", "g");
QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
+ QualifiedName tableForNullStaticColumn =
uniqueTestTableFullName(TEST_KEYSPACE);
+
+ @Test
+ void testReadNullStaticColumn()
+ {
+ Dataset<Row> data =
bulkReaderDataFrame(tableForNullStaticColumn).load();
+
+ List<Row> rows = data.collectAsList().stream()
+ .sorted(Comparator.comparing(row ->
row.getString(0)))
+ .collect(Collectors.toList());
+ assertThat(rows.size()).isEqualTo(DATASET.size());
+
+ for (int i = 0; i < DATASET.size(); i++)
+ {
+ Row row = rows.get(i);
+ assertThat(row.getString(0)).isEqualTo(DATASET.get(i));
+
assertThat(row.getTimestamp(1).getTime()).isEqualTo(1432815430948560L + i);
+ if (i % 2 == 0)
+ {
+ assertThat(row.getTimestamp(2)).as("Row " + (i + 1) + " is
expected to have null timestamp").isNull();
+ }
+ else
+ {
+
assertThat(row.getTimestamp(2).getTime()).isEqualTo(1432815430948560L + i);
+ }
+ }
+ }
@Test
void testReadingFromTwoDifferentTables()
@@ -78,6 +105,8 @@ class ReadDifferentTablesTest extends
SharedClusterSparkIntegrationTestBase
createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY
KEY, name text);");
createTestTable(table2, "CREATE TABLE IF NOT EXISTS %s (name text
PRIMARY KEY, value bigint);");
+ createTestTable(tableForNullStaticColumn, "CREATE TABLE %s (id text,
timestamp timestamp,\n" +
+ " timestamp_static
timestamp static, PRIMARY KEY (id, timestamp));");
IInstance firstRunningInstance = cluster.getFirstRunningInstance();
for (int i = 0; i < DATASET.size(); i++)
@@ -85,9 +114,13 @@ class ReadDifferentTablesTest extends
SharedClusterSparkIntegrationTestBase
String value = DATASET.get(i);
String query1 = String.format("INSERT INTO %s (id, name) VALUES
(%d, '%s');", table1, i, value);
String query2 = String.format("INSERT INTO %s (name, value) VALUES
('%s', %d);", table2, value, i);
+ String query3 = String.format("INSERT INTO %s (id, timestamp,
timestamp_static) VALUES ('%s',%d, %s)",
+ tableForNullStaticColumn, value,
1432815430948560L + i,
+ i % 2 == 0 ? "null" :
String.valueOf(i + 1432815430948560L));
firstRunningInstance.coordinator().execute(query1,
ConsistencyLevel.ALL);
firstRunningInstance.coordinator().execute(query2,
ConsistencyLevel.ALL);
+ firstRunningInstance.coordinator().execute(query3,
ConsistencyLevel.ALL);
}
}
}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index 2d031cc..bd12bc1 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -260,13 +260,9 @@ abstract class HostReplacementTestBase extends
ResiliencyTestBase
for (IInstance node : nodesToRemove)
{
cluster.stopUnchecked(node);
- String remAddress =
node.config().broadcastAddress().getAddress().getHostAddress();
-
- List<ClusterUtils.RingInstanceDetails> ring =
ClusterUtils.ring(seed);
- List<ClusterUtils.RingInstanceDetails> match = ring.stream()
- .filter((d) ->
d.getAddress().equals(remAddress))
-
.collect(Collectors.toList());
- assertThat(match.stream().anyMatch(r ->
r.getStatus().equals("Down"))).isTrue();
+ // awaitRingStatus will assert that the node status is down. It
retries multiple times until a timeout
+ // is reached and fails if the expected status is not seen.
+ cluster.awaitRingStatus(seed, node, "Down");
}
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index 1e1eac6..cbcd814 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -54,7 +54,8 @@ public abstract class CqlType implements CqlField.CqlType
@Override
public Object deserialize(ByteBuffer buffer, boolean isFrozen)
{
- return toSparkSqlType(serializer().deserialize(buffer));
+ Object value = serializer().deserialize(buffer);
+ return value != null ? toSparkSqlType(value) : null;
}
public abstract <T> TypeSerializer<T> serializer();
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
index 9e0316f..385395e 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
@@ -88,7 +88,8 @@ public abstract class CqlCollection extends CqlType
implements CqlField.CqlColle
@Override
public Object deserialize(ByteBuffer buffer, boolean isFrozen)
{
- return toSparkSqlType(serializer().deserialize(buffer));
+ Object value = serializer().deserialize(buffer);
+ return value != null ? toSparkSqlType(value) : null;
}
@Override
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 4e30386..194f5c9 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@ else
SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"
SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
- SIDECAR_COMMIT="${SIDECAR_COMMIT:-55866caa2c4601b1d59a8532a97310a9e819931f}"
+ SIDECAR_COMMIT="${SIDECAR_COMMIT:-e95786a077e1137dcaae206854986987edc6a71e}"
SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]