Author: xedin
Date: Fri Sep 2 20:23:42 2011
New Revision: 1164689
URL: http://svn.apache.org/viewvc?rev=1164689&view=rev
Log:
Streams Compression
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3015
Added:
cassandra/trunk/lib/compress-lzf-0.8.4.jar
cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt (with props)
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Sep 2 20:23:42 2011
@@ -58,6 +58,7 @@
* make the repair of a range repair all replica (CASSANDRA-2610)
* expose the ability to repair the first range (as returned by the
partitioner) of a node (CASSANDRA-2606)
+ * Streams Compression (CASSANDRA-3015)
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Added: cassandra/trunk/lib/compress-lzf-0.8.4.jar
URL:
http://svn.apache.org/viewvc/cassandra/trunk/lib/compress-lzf-0.8.4.jar?rev=1164689&view=auto
==============================================================================
Files cassandra/trunk/lib/compress-lzf-0.8.4.jar (added) and
cassandra/trunk/lib/compress-lzf-0.8.4.jar Fri Sep 2 20:23:42 2011 differ
Added: cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt?rev=1164689&view=auto
==============================================================================
--- cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt (added)
+++ cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt Fri Sep 2 20:23:42 2011
@@ -0,0 +1,11 @@
+Copyright 2009-2010 Ning, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
\ No newline at end of file
Propchange: cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt
------------------------------------------------------------------------------
svn:executable = *
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Fri Sep 2 20:23:42 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.net;
import java.io.*;
+import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.cassandra.gms.Gossiper;
@@ -78,7 +79,7 @@ public class IncomingTcpConnection exten
else
{
// streaming connections are per-session and have a fixed
version. we can't do anything with a new-version stream connection, so drop it.
- logger.error("Received untranslated stream from newer
protcol version. Terminating connection!");
+ logger.error("Received untranslated stream from newer
protocol version. Terminating connection!");
}
// We are done with this connection....
return;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
Fri Sep 2 20:23:42 2011
@@ -18,9 +18,9 @@
package org.apache.cassandra.streaming;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -40,6 +40,8 @@ import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
+import com.ning.compress.lzf.LZFOutputStream;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +60,7 @@ public class FileStreamTask extends Wrap
// communication socket
private Socket socket;
// socket's output stream
- private DataOutputStream output;
+ private OutputStream output;
// system encryption options if any
private final EncryptionOptions encryptionOptions;
// allocate buffer to use for transfers only once
@@ -119,7 +121,7 @@ public class FileStreamTask extends Wrap
private void stream() throws IOException
{
ByteBuffer HeaderBuffer =
MessagingService.instance().constructStreamHeader(header, false,
Gossiper.instance.getVersion(to));
- // write header
+ // write header (this should not be compressed for compatibility with
other messages)
output.write(ByteBufferUtil.getArray(HeaderBuffer));
if (header.file == null)
@@ -129,6 +131,9 @@ public class FileStreamTask extends Wrap
?
CompressedRandomAccessReader.open(header.file.getFilename(), true)
: RandomAccessReader.open(new
File(header.file.getFilename()), CHUNK_SIZE, true);
+ // setting up data compression stream
+ output = new LZFOutputStream(output);
+
try
{
// stream each of the required sections of the file
@@ -234,12 +239,12 @@ public class FileStreamTask extends Wrap
protected void connect() throws IOException
{
socket.connect(new InetSocketAddress(to,
DatabaseDescriptor.getStoragePort()));
- output = new DataOutputStream(socket.getOutputStream());
+ output = socket.getOutputStream();
}
protected void close() throws IOException
{
- socket.close();
+ output.close();
}
public String toString()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Fri Sep 2 20:23:42 2011
@@ -23,9 +23,6 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -41,6 +38,11 @@ import org.apache.cassandra.utils.ByteBu
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
+import com.ning.compress.lzf.LZFInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class IncomingStreamReader
{
private static final Logger logger =
LoggerFactory.getLogger(IncomingStreamReader.class);
@@ -79,7 +81,7 @@ public class IncomingStreamReader
assert remoteFile.estimatedKeys > 0;
SSTableReader reader = null;
logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
- DataInputStream dis = new DataInputStream(socket.getInputStream());
+ DataInputStream dis = new DataInputStream(new
LZFInputStream(socket.getInputStream()));
try
{
reader = streamIn(dis, localFile, remoteFile);