Fix NPE when table dropped during streaming patch by yukim; reviewed by krummas for CASSANDRA-7946
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d143487c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d143487c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d143487c Branch: refs/heads/trunk Commit: d143487cb198051d0eaccc7f587f35cc63fc85a9 Parents: 6198a75 Author: Yuki Morishita <[email protected]> Authored: Fri Sep 19 17:11:25 2014 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri Sep 19 17:11:25 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/streaming/StreamReader.java | 5 +++++ .../org/apache/cassandra/streaming/StreamReceiveTask.java | 8 ++++++++ .../cassandra/streaming/compress/CompressedStreamReader.java | 5 +++++ 4 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d143487c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index abd7c68..fd49b09 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ * Always send Paxos commit to all replicas (CASSANDRA-7479) * Make disruptor_thrift_server invocation pool configurable (CASSANDRA-7594) * Make repair no-op when RF=1 (CASSANDRA-7864) + * Fix NPE when table dropped during streaming (CASSANDRA-7946) Merged from 1.2: * Don't index tombstones (CASSANDRA-7828) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d143487c/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 15aa3cb..3b2a924 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -75,6 +75,11 @@ public class StreamReader long totalSize = totalSize(); Pair<String, String> kscf = Schema.instance.getCF(cfId); + if (kscf == null) + { + // schema was dropped during streaming + throw new IOException("CF " + cfId + " was dropped during streaming"); + } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); SSTableWriter writer = createWriter(cfs, totalSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d143487c/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 223a46e..33da3d1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -103,6 +103,14 @@ public class StreamReceiveTask extends StreamTask public void run() { Pair<String, String> kscf = Schema.instance.getCF(task.cfId); + if (kscf == null) + { + // schema was dropped during streaming + for (SSTableWriter writer : task.sstables) + writer.abort(); + task.sstables.clear(); + return; + } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d143487c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 4aac941..219cabb 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -60,6 +60,11 @@ public class CompressedStreamReader extends StreamReader long totalSize = totalSize(); Pair<String, String> kscf = Schema.instance.getCF(cfId); + if (kscf == null) + { + // schema was dropped during streaming + throw new IOException("CF " + cfId + " was dropped during streaming"); + } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); SSTableWriter writer = createWriter(cfs, totalSize);
