Updated Branches:
refs/heads/cassandra-1.1 99b245d31 -> d401d9f1d
refs/heads/trunk c320b82be -> c631cc7c3
Merge branch 'cassandra-1.1' into trunk
Conflicts:
src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c631cc7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c631cc7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c631cc7c
Branch: refs/heads/trunk
Commit: c631cc7c3a9f75d9d408038d88532a59a3b51c5e
Parents: c320b82 d401d9f
Author: Yuki Morishita <[email protected]>
Authored: Wed Oct 31 11:10:54 2012 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Wed Oct 31 11:10:54 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/IncomingStreamReader.java | 16 +++++++++++++++
.../cassandra/streaming/StreamInSession.java | 6 +++++
3 files changed, 23 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c631cc7c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fed3bc1,5f5ea89..11aaea1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -58,77 -11,9 +58,78 @@@ Merged from 1.1
* fix compositeType.{get/from}String methods (CASSANDRA-4842)
* (CQL) fix CREATE COLUMNFAMILY permissions check (CASSANDRA-4864)
* Fix DynamicCompositeType same type comparison (CASSANDRA-4711)
+ * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306)
+
+1.2-beta1
+ * add atomic_batch_mutate (CASSANDRA-4542, -4635)
+ * increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
+ * include message initiation time to replicas so they can more
+ accurately drop timed-out requests (CASSANDRA-2858)
+ * fix clientutil.jar dependencies (CASSANDRA-4566)
+ * optimize WriteResponse (CASSANDRA-4548)
+ * new metrics (CASSANDRA-4009)
+ * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897)
+ * debug tracing (CASSANDRA-1123)
+ * parallelize row cache loading (CASSANDRA-4282)
+ * Make compaction, flush JBOD-aware (CASSANDRA-4292)
+ * run local range scans on the read stage (CASSANDRA-3687)
+ * clean up ioexceptions (CASSANDRA-2116)
+ * add disk_failure_policy (CASSANDRA-2118)
+ * Introduce new json format with row level deletion (CASSANDRA-4054)
+ * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)
+ * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047)
+ * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
+ * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
+ * split up rpc timeout by operation type (CASSANDRA-2819)
+ * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
+ * update MS protocol with a version handshake + broadcast address id
+ (CASSANDRA-4311)
+ * multithreaded hint replay (CASSANDRA-4189)
+ * add inter-node message compression (CASSANDRA-3127)
+ * remove COPP (CASSANDRA-2479)
+ * Track tombstone expiration and compact when tombstone content is
+ higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234)
+ * update MurmurHash to version 3 (CASSANDRA-2975)
+ * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
+ * (CLI) jline version is bumped to 1.0 to properly support
+ 'delete' key function (CASSANDRA-4132)
+ * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392,
4289)
+ * Add support for range tombstones (CASSANDRA-3708)
+ * Improve MessagingService efficiency (CASSANDRA-3617)
+ * Avoid ID conflicts from concurrent schema changes (CASSANDRA-3794)
+ * Set thrift HSHA server thread limit to unlimited by default
(CASSANDRA-4277)
+ * Avoids double serialization of CF id in RowMutation messages
+ (CASSANDRA-4293)
+ * stream compressed sstables directly with java nio (CASSANDRA-4297)
+ * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
+ * Add column metadata to system column families (CASSANDRA-4018)
+ * (cql3) Always use composite types by default (CASSANDRA-4329)
+ * (cql3) Add support for set, map and list (CASSANDRA-3647)
+ * Validate date type correctly (CASSANDRA-4441)
+ * (cql3) Allow definitions with only a PK (CASSANDRA-4361)
+ * (cql3) Add support for row key composites (CASSANDRA-4179)
+ * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
+ * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
+ * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477)
+ * remove schema agreement checking from all external APIs (Thrift, CQL and
CQL3) (CASSANDRA-4487)
+ * add Murmur3Partitioner and make it default for new installations
(CASSANDRA-3772, 4621)
+ * (cql3) update pseudo-map syntax to use map syntax (CASSANDRA-4497)
+ * Finer grained exceptions hierarchy and provides error code with exceptions
(CASSANDRA-3979)
+ * Adds events push to binary protocol (CASSANDRA-4480)
+ * Rewrite nodetool help (CASSANDRA-2293)
+ * Make CQL3 the default for CQL (CASSANDRA-4640)
+ * update stress tool to be able to use CQL3 (CASSANDRA-4406)
+ * Accept all thrift update on CQL3 cf but don't expose their metadata
(CASSANDRA-4377)
+ * Replace Throttle with Guava's RateLimiter for HintedHandOff
(CASSANDRA-4541)
+ * fix counter add/get using CQL2 and CQL3 in stress tool (CASSANDRA-4633)
+ * Add sstable count per level to cfstats (CASSANDRA-4537)
+ * (cql3) Add ALTER KEYSPACE statement (CASSANDRA-4611)
+ * (cql3) Allow defining default consistency levels (CASSANDRA-4448)
+ * (cql3) Fix queries using LIMIT missing results (CASSANDRA-4579)
+ * fix cross-version gossip messaging (CASSANDRA-4576)
+
1.1.6
* Wait for writes on synchronous read digest mismatch (CASSANDRA-4792)
* fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c631cc7c/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 1c3fc4e,bfb046f..0859eaa
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@@ -37,14 -34,19 +37,16 @@@ import org.apache.cassandra.db.compacti
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StreamingMetrics;
++import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-
import com.ning.compress.lzf.LZFInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class IncomingStreamReader
{
private static final Logger logger =
LoggerFactory.getLogger(IncomingStreamReader.class);
@@@ -58,8 -59,19 +60,22 @@@
public IncomingStreamReader(StreamHeader header, Socket socket) throws
IOException
{
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
- this.socket = socket;
InetAddress host = header.broadcastAddress != null ?
header.broadcastAddress
:
((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+ if (header.pendingFiles.isEmpty() && header.file != null)
+ {
+ // StreamInSession should be created already when receiving 2nd
and after files
+ if (!StreamInSession.hasSession(host, header.sessionId))
+ {
+ StreamReply reply = new StreamReply("", header.sessionId,
StreamReply.Status.SESSION_FAILURE);
-
OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(host)),
Long.toString(header.sessionId), new
DataOutputStream(socket.getOutputStream()));
++ OutboundTcpConnection.write(reply.createMessage(),
++ Long.toString(header.sessionId),
++ System.currentTimeMillis(),
++ new
DataOutputStream(socket.getOutputStream()),
++
MessagingService.instance().getVersion(host));
+ throw new IOException("Session " + header.sessionId + "
already closed.");
+ }
+ }
session = StreamInSession.get(host, header.sessionId);
session.setSocket(socket);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c631cc7c/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamInSession.java
index 5753a15,e1bbe41..f8bde91
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@@ -101,6 -102,12 +101,12 @@@ public class StreamInSession extends Ab
return session;
}
+ public static boolean hasSession(InetAddress host, long sessionId)
+ {
- Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host,
sessionId);
++ Pair<InetAddress, Long> context = Pair.create(host, sessionId);
+ return sessions.get(context) != null;
+ }
+
public void setCurrentFile(PendingFile file)
{
this.current = file;