This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f4014c0  CASSANDRA-19626 Fix NullPointerException when reading static 
column with null values (#58)
f4014c0 is described below

commit f4014c06d7668541010d59cc932970e9ebfc36f5
Author: Francisco Guerrero <[email protected]>
AuthorDate: Fri May 10 13:17:04 2024 -0700

    CASSANDRA-19626 Fix NullPointerException when reading static column with 
null values (#58)
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19626
---
 CHANGES.txt                                        |  1 +
 .../spark/data/SidecarProvisionedSSTableTest.java  |  3 +-
 .../distributed/impl/CassandraCluster.java         |  6 ++++
 .../cassandra/testing/IClusterExtension.java       |  9 ++++++
 ...ifferentTablesTest.java => BulkReaderTest.java} | 37 ++++++++++++++++++++--
 .../replacement/HostReplacementTestBase.java       | 10 ++----
 .../org/apache/cassandra/spark/data/CqlType.java   |  3 +-
 .../spark/data/complex/CqlCollection.java          |  3 +-
 scripts/build-sidecar.sh                           |  2 +-
 9 files changed, 60 insertions(+), 14 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c00c881..4c16a54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * NullPointerException when reading static column with null values 
(CASSANDRA-19626)
  * Integrate with the latest sidecar client (CASSANDRA-19616)
  * Support bulk write via S3 (CASSANDRA-19563)
  * Support UDTs in the Bulk Writer (CASSANDRA-19340)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index c418d43..4b5ec5e 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -155,8 +155,7 @@ class SidecarProvisionedSSTableTest
                                                                                
              1,
                                                                                
              snapshot,
                                                                                
              keyspace,
-                                                                               
              table,
-                                                                               
              "abc1234",
+                                                                               
              table + "-abc1234",
                                                                                
              dataFileName);
         return new SidecarProvisionedSSTable(mockSidecarClient,
                                              sidecarClientConfig,
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index f5a5abd..795c758 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -170,6 +170,12 @@ public class CassandraCluster<I extends IInstance> 
implements IClusterExtension<
         ClusterUtils.awaitRingState(instance, expectedInRing, state);
     }
 
+    @Override
+    public void awaitRingStatus(IInstance instance, IInstance expectedInRing, 
String status)
+    {
+        ClusterUtils.awaitRingStatus(instance, expectedInRing, status);
+    }
+
     @Override
     public void awaitGossipStatus(IInstance instance, IInstance 
expectedInGossip, String targetStatus)
     {
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
index c604e37..915d289 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
@@ -81,6 +81,15 @@ public interface IClusterExtension<I extends IInstance> 
extends ICluster<I>
      */
     void awaitRingState(IInstance instance, IInstance expectedInRing, String 
state);
 
+    /**
+     * Wait for the ring to have the target instance with the provided status.
+     *
+     * @param instance       instance to check on
+     * @param expectedInRing to look for
+     * @param status         expected
+     */
+    void awaitRingStatus(IInstance instance, IInstance expectedInRing, String 
status);
+
     /**
      * Waits for the target instance to have the desired status. Target status 
is checked via string contains so works
      * with 'NORMAL' but also can check tokens or full state.
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
similarity index 68%
rename from 
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
rename to 
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
index 88e215d..239af60 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
@@ -38,13 +38,40 @@ import static 
org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Test that reads different tables with different schemas within the same test
+ * Tests bulk reader functionality
  */
-class ReadDifferentTablesTest extends SharedClusterSparkIntegrationTestBase
+class BulkReaderTest extends SharedClusterSparkIntegrationTestBase
 {
     static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", 
"f", "g");
     QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
     QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
+    QualifiedName tableForNullStaticColumn = 
uniqueTestTableFullName(TEST_KEYSPACE);
+
+    @Test
+    void testReadNullStaticColumn()
+    {
+        Dataset<Row> data = 
bulkReaderDataFrame(tableForNullStaticColumn).load();
+
+        List<Row> rows = data.collectAsList().stream()
+                             .sorted(Comparator.comparing(row -> 
row.getString(0)))
+                             .collect(Collectors.toList());
+        assertThat(rows.size()).isEqualTo(DATASET.size());
+
+        for (int i = 0; i < DATASET.size(); i++)
+        {
+            Row row = rows.get(i);
+            assertThat(row.getString(0)).isEqualTo(DATASET.get(i));
+            
assertThat(row.getTimestamp(1).getTime()).isEqualTo(1432815430948560L + i);
+            if (i % 2 == 0)
+            {
+                assertThat(row.getTimestamp(2)).as("Row " + (i + 1) + " is 
expected to have null timestamp").isNull();
+            }
+            else
+            {
+                
assertThat(row.getTimestamp(2).getTime()).isEqualTo(1432815430948560L + i);
+            }
+        }
+    }
 
     @Test
     void testReadingFromTwoDifferentTables()
@@ -78,6 +105,8 @@ class ReadDifferentTablesTest extends 
SharedClusterSparkIntegrationTestBase
         createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
         createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY 
KEY, name text);");
         createTestTable(table2, "CREATE TABLE IF NOT EXISTS %s (name text 
PRIMARY KEY, value bigint);");
+        createTestTable(tableForNullStaticColumn, "CREATE TABLE %s (id text, 
timestamp timestamp,\n" +
+                                                  "   timestamp_static 
timestamp static, PRIMARY KEY (id, timestamp));");
 
         IInstance firstRunningInstance = cluster.getFirstRunningInstance();
         for (int i = 0; i < DATASET.size(); i++)
@@ -85,9 +114,13 @@ class ReadDifferentTablesTest extends 
SharedClusterSparkIntegrationTestBase
             String value = DATASET.get(i);
             String query1 = String.format("INSERT INTO %s (id, name) VALUES 
(%d, '%s');", table1, i, value);
             String query2 = String.format("INSERT INTO %s (name, value) VALUES 
('%s', %d);", table2, value, i);
+            String query3 = String.format("INSERT INTO %s (id, timestamp, 
timestamp_static) VALUES ('%s',%d, %s)",
+                                          tableForNullStaticColumn, value, 
1432815430948560L + i,
+                                          i % 2 == 0 ? "null" : 
String.valueOf(i + 1432815430948560L));
 
             firstRunningInstance.coordinator().execute(query1, 
ConsistencyLevel.ALL);
             firstRunningInstance.coordinator().execute(query2, 
ConsistencyLevel.ALL);
+            firstRunningInstance.coordinator().execute(query3, 
ConsistencyLevel.ALL);
         }
     }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index 2d031cc..bd12bc1 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -260,13 +260,9 @@ abstract class HostReplacementTestBase extends 
ResiliencyTestBase
         for (IInstance node : nodesToRemove)
         {
             cluster.stopUnchecked(node);
-            String remAddress = 
node.config().broadcastAddress().getAddress().getHostAddress();
-
-            List<ClusterUtils.RingInstanceDetails> ring = 
ClusterUtils.ring(seed);
-            List<ClusterUtils.RingInstanceDetails> match = ring.stream()
-                                                               .filter((d) -> 
d.getAddress().equals(remAddress))
-                                                               
.collect(Collectors.toList());
-            assertThat(match.stream().anyMatch(r -> 
r.getStatus().equals("Down"))).isTrue();
+            // awaitRingStatus will assert that the node status is down. It 
retries multiple times until a timeout
+            // is reached and fails if the expected status is not seen.
+            cluster.awaitRingStatus(seed, node, "Down");
         }
     }
 
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index 1e1eac6..cbcd814 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -54,7 +54,8 @@ public abstract class CqlType implements CqlField.CqlType
     @Override
     public Object deserialize(ByteBuffer buffer, boolean isFrozen)
     {
-        return toSparkSqlType(serializer().deserialize(buffer));
+        Object value = serializer().deserialize(buffer);
+        return value != null ? toSparkSqlType(value) : null;
     }
 
     public abstract <T> TypeSerializer<T> serializer();
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
index 9e0316f..385395e 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/data/complex/CqlCollection.java
@@ -88,7 +88,8 @@ public abstract class CqlCollection extends CqlType 
implements CqlField.CqlColle
     @Override
     public Object deserialize(ByteBuffer buffer, boolean isFrozen)
     {
-        return toSparkSqlType(serializer().deserialize(buffer));
+        Object value = serializer().deserialize(buffer);
+        return value != null ? toSparkSqlType(value) : null;
     }
 
     @Override
diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh
index 4e30386..194f5c9 100755
--- a/scripts/build-sidecar.sh
+++ b/scripts/build-sidecar.sh
@@ -24,7 +24,7 @@ else
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-55866caa2c4601b1d59a8532a97310a9e819931f}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-e95786a077e1137dcaae206854986987edc6a71e}"
   SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies"
   SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR}
   SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to