Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 65ee15f98 -> 8afc76ae6
Fix flaky test failure: SSTableLoaderTest Patch by blambov; reviewed by jmckenzie for CASSANDRA-10118 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8afc76ae Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8afc76ae Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8afc76ae Branch: refs/heads/cassandra-3.0 Commit: 8afc76ae63cd97fb7188653b6e58e4b2149f5d77 Parents: 65ee15f Author: Branimir Lambov <[email protected]> Authored: Tue Aug 25 12:20:08 2015 -0400 Committer: Joshua McKenzie <[email protected]> Committed: Tue Aug 25 12:20:08 2015 -0400 ---------------------------------------------------------------------- .../cassandra/io/sstable/SSTableLoaderTest.java | 39 ++++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8afc76ae/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index dfd7821..3370e56 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -18,9 +18,12 @@ package org.apache.cassandra.io.sstable; import java.io.File; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import com.google.common.io.Files; + import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -38,6 +41,9 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; @@ -113,8 +119,9 @@ public class SSTableLoaderTest writer.addRow("key1", "col1", "100"); } + final CountDownLatch latch = new CountDownLatch(1); SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); - loader.stream().get(); + loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build()); @@ -123,6 +130,10 @@ public class SSTableLoaderTest assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(new Clustering(ByteBufferUtil.bytes("col1"))) .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val"))) .value()); + + // The stream future is signalled when the work is complete but before releasing references. Wait for release + // before cleanup (CASSANDRA-10118). + latch.await(); } @Test @@ -153,9 +164,10 @@ public class SSTableLoaderTest //make sure we have some tables... assertTrue(dataDir.listFiles().length > 0); + final CountDownLatch latch = new CountDownLatch(2); //writer is still open so loader should not load anything SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); - loader.stream().get(); + loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build()); @@ -165,9 +177,30 @@ public class SSTableLoaderTest writer.close(); loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); - loader.stream().get(); + loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build()); assertEquals(1000, partitions.size()); + + // The stream future is signalled when the work is complete but before releasing references. Wait for release + // before cleanup (CASSANDRA-10118). + latch.await(); + } + + StreamEventHandler completionStreamListener(final CountDownLatch latch) + { + return new StreamEventHandler() { + public void onFailure(Throwable arg0) + { + latch.countDown(); + } + + public void onSuccess(StreamState arg0) + { + latch.countDown(); + } + + public void handleStreamEvent(StreamEvent event) {} + }; } }
