Author: jbellis
Date: Mon Apr 18 21:58:20 2011
New Revision: 1094789
URL: http://svn.apache.org/viewvc?rev=1094789&view=rev
Log:
preserve version when streaming data from old sstables
patch by jbellis; reviewed by Stu Hood for CASSANDRA-2283
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Apr 18 21:58:20 2011
@@ -30,6 +30,7 @@
* use 64KB flush buffer instead of in_memory_compaction_limit (CASSANDRA-2463)
* fix duplicate results from CFS.scan (CASSANDRA-2406)
* avoid caching token-only decoratedkeys (CASSANDRA-2416)
+ * preserve version when streaming data from old sstables (CASSANDRA-2283)
0.7.4
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Mon Apr 18 21:58:20 2011
@@ -657,23 +657,29 @@ public class ColumnFamilyStore implement
return columnFamily;
}
+ public String getFlushPath()
+ {
+ return getFlushPath(Descriptor.CURRENT_VERSION);
+ }
+
/*
* @return a temporary file name for an sstable.
* When the sstable object is closed, it will be renamed to a non-temporary
* format, so incomplete sstables can be recognized and removed on startup.
*/
- public String getFlushPath()
+ public String getFlushPath(String version)
{
long guessedSize = 2L * memsize.value() * 1024*1024; // 2* adds room
for keys, column indexes
String location =
DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
if (location == null)
throw new RuntimeException("Insufficient disk space to flush");
- return getTempSSTablePath(location);
+ return getTempSSTablePath(location, version);
}
- public String getTempSSTablePath(String directory)
+ public String getTempSSTablePath(String directory, String version)
{
- Descriptor desc = new Descriptor(new File(directory),
+ Descriptor desc = new Descriptor(version,
+ new File(directory),
table.name,
columnFamily,
fileIndexGenerator.incrementAndGet(),
@@ -681,6 +687,11 @@ public class ColumnFamilyStore implement
return desc.filenameFor(Component.DATA);
}
+ public String getTempSSTablePath(String directory)
+ {
+ return getTempSSTablePath(directory, Descriptor.CURRENT_VERSION);
+ }
+
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
Mon Apr 18 21:58:20 2011
@@ -49,11 +49,6 @@ public class StreamIn
/**
* Request ranges to be transferred from source to local node
*/
- public static void requestRanges(InetAddress source, String tableName,
Collection<Range> ranges)
- {
- requestRanges(source, tableName, ranges, null);
- }
-
public static void requestRanges(InetAddress source, String tableName,
Collection<Range> ranges, Runnable callback)
{
assert ranges.size() > 0;
@@ -74,7 +69,7 @@ public class StreamIn
// new local sstable
Table table = Table.open(remotedesc.ksname);
ColumnFamilyStore cfStore =
table.getColumnFamilyStore(remotedesc.cfname);
- Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
+ Descriptor localdesc =
Descriptor.fromFilename(cfStore.getFlushPath(remote.desc.version));
return new PendingFile(localdesc, remote);
}
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
Mon Apr 18 21:58:20 2011
@@ -37,6 +37,7 @@ public class BootstrapTest extends Schem
public void testGetNewNames() throws IOException
{
Descriptor desc = Descriptor.fromFilename(new File("Keyspace1",
"Standard1-500-Data.db").toString());
+ assert !desc.isLatestVersion; // deliberately test old version; see
CASSANDRA-2283
PendingFile inContext = new PendingFile(null, desc, "Data.db",
Arrays.asList(new Pair<Long,Long>(0L, 1L)));
PendingFile outContext = StreamIn.getContextMapping(inContext);
@@ -45,7 +46,8 @@ public class BootstrapTest extends Schem
// nothing else should
assertEquals(inContext.component, outContext.component);
- assertEquals(inContext.desc.ksname, outContext.desc.ksname);
- assertEquals(inContext.desc.cfname, outContext.desc.cfname);
+ assertEquals(desc.ksname, outContext.desc.ksname);
+ assertEquals(desc.cfname, outContext.desc.cfname);
+ assertEquals(desc.version, outContext.desc.version);
}
}
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
Mon Apr 18 21:58:20 2011
@@ -50,9 +50,9 @@ public class SerializationsTest extends
private void testPendingFileWrite() throws IOException
{
// make sure to test serializing null and a pf with no sstable.
- PendingFile normal = makePendingFile(true, "fake_component", 100);
- PendingFile noSections = makePendingFile(true, "not_real", 0);
- PendingFile noSST = makePendingFile(false, "also_fake", 100);
+ PendingFile normal = makePendingFile(true, 100);
+ PendingFile noSections = makePendingFile(true, 0);
+ PendingFile noSST = makePendingFile(false, 100);
DataOutputStream out = getOutput("streaming.PendingFile.bin");
PendingFile.serializer().serialize(normal, out);
@@ -78,14 +78,14 @@ public class SerializationsTest extends
private void testStreamHeaderWrite() throws IOException
{
- StreamHeader sh0 = new StreamHeader("Keyspace1", 123L,
makePendingFile(true, "zz", 100));
- StreamHeader sh1 = new StreamHeader("Keyspace1", 124L,
makePendingFile(false, "zz", 100));
+ StreamHeader sh0 = new StreamHeader("Keyspace1", 123L,
makePendingFile(true, 100));
+ StreamHeader sh1 = new StreamHeader("Keyspace1", 124L,
makePendingFile(false, 100));
Collection<PendingFile> files = new ArrayList<PendingFile>();
for (int i = 0; i < 50; i++)
- files.add(makePendingFile(i % 2 == 0, "aa", 100));
- StreamHeader sh2 = new StreamHeader("Keyspace1", 125L,
makePendingFile(true, "bb", 100), files);
+ files.add(makePendingFile(i % 2 == 0, 100));
+ StreamHeader sh2 = new StreamHeader("Keyspace1", 125L,
makePendingFile(true, 100), files);
StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
- StreamHeader sh4 = new StreamHeader("Keyspace1", 125L,
makePendingFile(true, "bb", 100), new ArrayList<PendingFile>());
+ StreamHeader sh4 = new StreamHeader("Keyspace1", 125L,
makePendingFile(true, 100), new ArrayList<PendingFile>());
DataOutputStream out = getOutput("streaming.StreamHeader.bin");
StreamHeader.serializer().serialize(sh0, out);
@@ -132,13 +132,13 @@ public class SerializationsTest extends
in.close();
}
- private static PendingFile makePendingFile(boolean sst, String comp, int
numSecs)
+ private static PendingFile makePendingFile(boolean sst, int numSecs)
{
Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"),
"Keyspace1", "Standard1", 23, false);
List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
for (int i = 0; i < numSecs; i++)
sections.add(new Pair<Long, Long>(new Long(i), new Long(i * i)));
- return new PendingFile(sst ? makeSSTable() : null, desc, comp,
sections);
+ return new PendingFile(sst ? makeSSTable() : null, desc,
SSTable.COMPONENT_DATA, sections);
}
private void testStreamRequestMessageWrite() throws IOException
@@ -147,8 +147,8 @@ public class SerializationsTest extends
for (int i = 0; i < 5; i++)
ranges.add(new Range(new
BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new
BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
StreamRequestMessage msg0 = new
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L);
- StreamRequestMessage msg1 = new
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, "aa",
100), 124L);
- StreamRequestMessage msg2 = new
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false,
"aa", 100), 124L);
+ StreamRequestMessage msg1 = new
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100),
124L);
+ StreamRequestMessage msg2 = new
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false,
100), 124L);
DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
StreamRequestMessage.serializer().serialize(msg0, out);