This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f6fcc3 Avoid race condition when completing stream sessions
9f6fcc3 is described below
commit 9f6fcc340d89eecc000765f6ab93e862f53a02d9
Author: Zhao Yang <[email protected]>
AuthorDate: Fri Mar 20 15:56:53 2020 +0800
Avoid race condition when completing stream sessions
patch by ZhaoYang; reviewed by Sergio Bossa and Benjamin Lerer for
CASSANDRA-15666
---
.circleci/config.yml | 174 +++++-----
CHANGES.txt | 1 +
.../streaming/CassandraCompressedStreamWriter.java | 2 +-
.../db/streaming/CassandraStreamWriter.java | 2 +-
.../org/apache/cassandra/io/util/ChannelProxy.java | 10 +
.../apache/cassandra/net/OutboundConnection.java | 4 +-
.../org/apache/cassandra/service/QueryState.java | 2 +-
.../cassandra/streaming/StreamCoordinator.java | 15 +-
.../apache/cassandra/streaming/StreamManager.java | 28 +-
.../org/apache/cassandra/streaming/StreamPlan.java | 4 +-
.../cassandra/streaming/StreamResultFuture.java | 31 +-
.../apache/cassandra/streaming/StreamSession.java | 352 +++++++++++++++------
.../async/NettyStreamingMessageSender.java | 60 +++-
.../streaming/async/StreamingInboundHandler.java | 17 +-
.../streaming/messages/CompleteMessage.java | 2 +-
.../streaming/messages/IncomingStreamMessage.java | 4 +-
.../streaming/messages/KeepAliveMessage.java | 2 +-
.../streaming/messages/OutgoingStreamMessage.java | 2 +-
.../streaming/messages/PrepareAckMessage.java | 2 +-
.../streaming/messages/PrepareSynAckMessage.java | 2 +-
.../streaming/messages/PrepareSynMessage.java | 2 +-
.../streaming/messages/ReceivedMessage.java | 2 +-
.../streaming/messages/SessionFailedMessage.java | 2 +-
.../streaming/messages/StreamInitMessage.java | 2 +-
.../streaming/messages/StreamMessage.java | 6 +-
.../org/apache/cassandra/utils/FBUtilities.java | 75 +++++
.../cassandra/distributed/test/StreamingTest.java | 107 +++++++
.../microbench/ZeroCopyStreamingBenchmark.java | 4 +-
.../CassandraEntireSSTableStreamWriterTest.java | 4 +-
.../db/streaming/CassandraStreamManagerTest.java | 1 +
.../apache/cassandra/dht/StreamStateStoreTest.java | 4 +-
...ntireSSTableStreamingCorrectFilesCountTest.java | 9 +-
.../streaming/StreamTransferTaskTest.java | 10 +-
.../async/NettyStreamingMessageSenderTest.java | 6 +-
.../async/StreamingInboundHandlerTest.java | 6 +-
35 files changed, 673 insertions(+), 283 deletions(-)
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 7f6c93b..7231d67 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -3,7 +3,7 @@ jobs:
j8_jvm_upgrade_dtests:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -85,8 +85,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -94,7 +94,7 @@ jobs:
j8_cqlsh-dtests-py2-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -162,8 +162,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -171,7 +171,7 @@ jobs:
j11_unit_tests:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -253,8 +253,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -263,7 +263,7 @@ jobs:
j8_cqlsh-dtests-py38-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -331,8 +331,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -340,7 +340,7 @@ jobs:
j11_cqlsh-dtests-py3-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -408,8 +408,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -418,7 +418,7 @@ jobs:
j11_cqlsh-dtests-py3-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -486,8 +486,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -496,7 +496,7 @@ jobs:
j11_cqlsh-dtests-py38-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -564,8 +564,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -574,7 +574,7 @@ jobs:
j8_cqlsh-dtests-py3-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -642,8 +642,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -651,7 +651,7 @@ jobs:
j8_cqlsh-dtests-py2-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -719,8 +719,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -728,7 +728,7 @@ jobs:
j11_cqlsh-dtests-py2-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -796,8 +796,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -806,7 +806,7 @@ jobs:
j11_dtests-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -877,8 +877,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -887,7 +887,7 @@ jobs:
j8_dtests-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -936,8 +936,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -945,7 +945,7 @@ jobs:
j8_upgradetests-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1035,8 +1035,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1044,7 +1044,7 @@ jobs:
utests_stress:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1080,8 +1080,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1089,7 +1089,7 @@ jobs:
j8_unit_tests:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1171,8 +1171,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1180,7 +1180,7 @@ jobs:
j11_jvm_dtests:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1262,8 +1262,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1272,7 +1272,7 @@ jobs:
j11_build:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1343,8 +1343,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1353,7 +1353,7 @@ jobs:
j11_cqlsh-dtests-py2-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1421,8 +1421,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1431,7 +1431,7 @@ jobs:
j8_dtests-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1480,8 +1480,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1489,7 +1489,7 @@ jobs:
j11_cqlsh-dtests-py38-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1557,8 +1557,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -1567,7 +1567,7 @@ jobs:
j8_jvm_dtests:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1649,8 +1649,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1658,7 +1658,7 @@ jobs:
j8_build:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1729,8 +1729,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1738,7 +1738,7 @@ jobs:
j8_cqlsh-dtests-py3-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1806,8 +1806,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1815,7 +1815,7 @@ jobs:
j8_cqlsh-dtests-py38-with-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -1883,8 +1883,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1892,7 +1892,7 @@ jobs:
utests_long:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1928,8 +1928,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1937,7 +1937,7 @@ jobs:
utests_fqltool:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -1973,8 +1973,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -1982,7 +1982,7 @@ jobs:
j11_dtests-no-vnodes:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -2053,8 +2053,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
@@ -2063,7 +2063,7 @@ jobs:
utests_compression:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 4
@@ -2145,8 +2145,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
@@ -2154,7 +2154,7 @@ jobs:
j8_dtest_jars_build:
docker:
- image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200406
- resource_class: medium
+ resource_class: xlarge
working_directory: ~/
shell: /bin/bash -eo pipefail -l
parallelism: 1
@@ -2213,8 +2213,8 @@ jobs:
- CASS_DRIVER_NO_EXTENSIONS: true
- CASS_DRIVER_NO_CYTHON: true
- CASSANDRA_SKIP_SYNC: true
- - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
- - DTEST_BRANCH: master
+ - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+ - DTEST_BRANCH: CASSANDRA-15666
- CCM_MAX_HEAP_SIZE: 1024M
- CCM_HEAP_NEWSIZE: 256M
- JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
diff --git a/CHANGES.txt b/CHANGES.txt
index b9c8f8d..cfc1f4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Avoid race condition when completing stream sessions (CASSANDRA-15666)
* Flush with fast compressors by default (CASSANDRA-15379)
* Fix CqlInputFormat regression from the switch to system.size_estimates
(CASSANDRA-15637)
* Allow sending Entire SSTables over SSL (CASSANDRA-15740)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index 21406b2..d92314b 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -62,7 +62,7 @@ public class CassandraCompressedStreamWriter extends
CassandraStreamWriter
long totalSize = totalSize();
logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt =
{}, totalSize = {}", session.planId(),
sstable.getFilename(), session.peer,
sstable.getSSTableMetadata().repairedAt, totalSize);
- try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
+ try (ChannelProxy fc = sstable.getDataChannel().newChannel())
{
long progress = 0L;
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index ffc663d..8382f0a 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -83,7 +83,7 @@ public class CassandraStreamWriter
sstable.getFilename(), session.peer,
sstable.getSSTableMetadata().repairedAt, totalSize);
AsyncStreamingOutputPlus out = (AsyncStreamingOutputPlus) output;
- try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy();
+ try(ChannelProxy proxy = sstable.getDataChannel().newChannel();
ChecksumValidator validator = new
File(sstable.descriptor.filenameFor(Component.CRC)).exists()
?
DataIntegrityMetadata.checksumValidator(sstable.descriptor)
: null)
diff --git a/src/java/org/apache/cassandra/io/util/ChannelProxy.java
b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
index 91bb03b..9ff46b7 100644
--- a/src/java/org/apache/cassandra/io/util/ChannelProxy.java
+++ b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
@@ -111,6 +111,16 @@ public final class ChannelProxy extends SharedCloseableImpl
}
}
+ /**
+ * {@link #sharedCopy()} can not be used if thread will be interruped, as
the backing channel will be closed.
+ *
+ * @return a new channel instance
+ */
+ public final ChannelProxy newChannel()
+ {
+ return new ChannelProxy(filePath);
+ }
+
public ChannelProxy sharedCopy()
{
return new ChannelProxy(this);
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 0503259..b84ebc3 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -1561,8 +1561,8 @@ public class OutboundConnection
Established established = state.established();
Channel channel = established.channel;
OutboundConnectionSettings settings = established.settings;
- return SocketFactory.channelId(settings.from, (InetSocketAddress)
channel.remoteAddress(),
- settings.to, (InetSocketAddress)
channel.localAddress(),
+ return SocketFactory.channelId(settings.from, (InetSocketAddress)
channel.localAddress(),
+ settings.to, (InetSocketAddress)
channel.remoteAddress(),
type, channel.id().asShortText());
}
diff --git a/src/java/org/apache/cassandra/service/QueryState.java
b/src/java/org/apache/cassandra/service/QueryState.java
index 26f58bf..adb13b5 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -68,7 +68,7 @@ public class QueryState
public long getTimestamp()
{
if (timestamp == Long.MIN_VALUE)
- timestamp = clientState.getTimestamp();
+ timestamp = ClientState.getTimestamp();
return timestamp;
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 6d757b6..c8ebabb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -42,17 +42,19 @@ public class StreamCoordinator
private final Map<InetAddressAndPort, HostStreamingData> peerSessions =
new HashMap<>();
private final StreamOperation streamOperation;
private final int connectionsPerHost;
+ private final boolean follower;
private StreamConnectionFactory factory;
private Iterator<StreamSession> sessionsToConnect = null;
private final UUID pendingRepair;
private final PreviewKind previewKind;
public StreamCoordinator(StreamOperation streamOperation, int
connectionsPerHost, StreamConnectionFactory factory,
- boolean connectSequentially, UUID pendingRepair,
PreviewKind previewKind)
+ boolean follower, boolean connectSequentially,
UUID pendingRepair, PreviewKind previewKind)
{
this.streamOperation = streamOperation;
this.connectionsPerHost = connectionsPerHost;
this.factory = factory;
+ this.follower = follower;
this.connectSequentially = connectSequentially;
this.pendingRepair = pendingRepair;
this.previewKind = previewKind;
@@ -86,9 +88,9 @@ public class StreamCoordinator
return results;
}
- public boolean isReceiving()
+ public boolean isFollower()
{
- return connectionsPerHost == 0;
+ return follower;
}
public void connect(StreamResultFuture future)
@@ -272,8 +274,7 @@ public class StreamCoordinator
{
for (StreamSession session : streamSessions.values())
{
- StreamSession.State state = session.state();
- if (state != StreamSession.State.COMPLETE && state !=
StreamSession.State.FAILED)
+ if (!session.state().isFinalState())
return true;
}
return false;
@@ -284,7 +285,7 @@ public class StreamCoordinator
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(streamOperation,
peer, factory, streamSessions.size(),
+ StreamSession session = new StreamSession(streamOperation,
peer, factory, isFollower(), streamSessions.size(),
pendingRepair,
previewKind);
streamSessions.put(++lastReturned, session);
return session;
@@ -317,7 +318,7 @@ public class StreamCoordinator
StreamSession session = streamSessions.get(id);
if (session == null)
{
- session = new StreamSession(streamOperation, peer, factory,
id, pendingRepair, previewKind);
+ session = new StreamSession(streamOperation, peer, factory,
isFollower(), id, pendingRepair, previewKind);
streamSessions.put(id, session);
}
return session;
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 81c65c5..da77ad2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -106,12 +106,12 @@ public class StreamManager implements StreamManagerMBean
* We manage them in two different maps to distinguish plan from initiated
ones to
* receiving ones withing the same JVM.
*/
- private final Map<UUID, StreamResultFuture> initiatedStreams = new
NonBlockingHashMap<>();
- private final Map<UUID, StreamResultFuture> receivingStreams = new
NonBlockingHashMap<>();
+ private final Map<UUID, StreamResultFuture> initiatorStreams = new
NonBlockingHashMap<>();
+ private final Map<UUID, StreamResultFuture> followerStreams = new
NonBlockingHashMap<>();
public Set<CompositeData> getCurrentStreams()
{
- return
Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(),
receivingStreams.values()), new Function<StreamResultFuture, CompositeData>()
+ return
Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(),
followerStreams.values()), new Function<StreamResultFuture, CompositeData>()
{
public CompositeData apply(StreamResultFuture input)
{
@@ -120,7 +120,7 @@ public class StreamManager implements StreamManagerMBean
}));
}
- public void register(final StreamResultFuture result)
+ public void registerInitiator(final StreamResultFuture result)
{
result.addEventListener(notifier);
// Make sure we remove the stream on completion (whether successful or
not)
@@ -128,14 +128,14 @@ public class StreamManager implements StreamManagerMBean
{
public void run()
{
- initiatedStreams.remove(result.planId);
+ initiatorStreams.remove(result.planId);
}
}, MoreExecutors.directExecutor());
- initiatedStreams.put(result.planId, result);
+ initiatorStreams.put(result.planId, result);
}
- public StreamResultFuture registerReceiving(final StreamResultFuture
result)
+ public StreamResultFuture registerFollower(final StreamResultFuture result)
{
result.addEventListener(notifier);
// Make sure we remove the stream on completion (whether successful or
not)
@@ -143,17 +143,17 @@ public class StreamManager implements StreamManagerMBean
{
public void run()
{
- receivingStreams.remove(result.planId);
+ followerStreams.remove(result.planId);
}
}, MoreExecutors.directExecutor());
- StreamResultFuture previous =
receivingStreams.putIfAbsent(result.planId, result);
+ StreamResultFuture previous =
followerStreams.putIfAbsent(result.planId, result);
return previous == null ? result : previous;
}
public StreamResultFuture getReceivingStream(UUID planId)
{
- return receivingStreams.get(planId);
+ return followerStreams.get(planId);
}
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter, Object handback)
@@ -178,11 +178,15 @@ public class StreamManager implements StreamManagerMBean
public StreamSession findSession(InetAddressAndPort peer, UUID planId, int
sessionIndex)
{
- StreamSession session = findSession(initiatedStreams, peer, planId,
sessionIndex);
+ // Search follower session first, because in some tests, eg.
StreamingTransferTest, both initiator session
+ // and follower session are listening to local host.
+ // TODO CASSANDRA-15665 it's more robust to add "isFollower" flag into
{@link StreamMessageHeader} to distinguish
+ // initiator session and follower session.
+ StreamSession session = findSession(followerStreams, peer, planId,
sessionIndex);
if (session != null)
return session;
- return findSession(receivingStreams, peer, planId, sessionIndex);
+ return findSession(initiatorStreams, peer, planId, sessionIndex);
}
private StreamSession findSession(Map<UUID, StreamResultFuture> streams,
InetAddressAndPort peer, UUID planId, int sessionIndex)
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 3fcabd0..60845fa 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -64,7 +64,7 @@ public class StreamPlan
{
this.streamOperation = streamOperation;
this.coordinator = new StreamCoordinator(streamOperation,
connectionsPerHost, new DefaultConnectionFactory(),
- connectSequentially,
pendingRepair, previewKind);
+ false, connectSequentially,
pendingRepair, previewKind);
}
/**
@@ -176,7 +176,7 @@ public class StreamPlan
*/
public StreamResultFuture execute()
{
- return StreamResultFuture.init(planId, streamOperation, handlers,
coordinator);
+ return StreamResultFuture.createInitiator(planId, streamOperation,
handlers, coordinator);
}
/**
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 3268ecf..2b5791f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -68,19 +68,19 @@ public final class StreamResultFuture extends
AbstractFuture<StreamState>
this.coordinator = coordinator;
// if there is no session to listen to, we immediately set result for
returning
- if (!coordinator.isReceiving() && !coordinator.hasActiveSessions())
+ if (!coordinator.isFollower() && !coordinator.hasActiveSessions())
set(getCurrentState());
}
private StreamResultFuture(UUID planId, StreamOperation streamOperation,
UUID pendingRepair, PreviewKind previewKind)
{
- this(planId, streamOperation, new StreamCoordinator(streamOperation,
0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
+ this(planId, streamOperation, new StreamCoordinator(streamOperation,
0, new DefaultConnectionFactory(), true, false, pendingRepair, previewKind));
}
- public static StreamResultFuture init(UUID planId, StreamOperation
streamOperation, Collection<StreamEventHandler> listeners,
- StreamCoordinator coordinator)
+ public static StreamResultFuture createInitiator(UUID planId,
StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
+ StreamCoordinator
coordinator)
{
- StreamResultFuture future = createAndRegister(planId, streamOperation,
coordinator);
+ StreamResultFuture future = createAndRegisterInitiator(planId,
streamOperation, coordinator);
if (listeners != null)
{
for (StreamEventHandler listener : listeners)
@@ -100,13 +100,13 @@ public final class StreamResultFuture extends
AbstractFuture<StreamState>
return future;
}
- public static synchronized StreamResultFuture initReceivingSide(int
sessionIndex,
- UUID
planId,
-
StreamOperation streamOperation,
-
InetAddressAndPort from,
- Channel
channel,
- UUID
pendingRepair,
-
PreviewKind previewKind)
+ public static synchronized StreamResultFuture createFollower(int
sessionIndex,
+ UUID planId,
+
StreamOperation streamOperation,
+
InetAddressAndPort from,
+ Channel
channel,
+ UUID
pendingRepair,
+ PreviewKind
previewKind)
{
StreamResultFuture future =
StreamManager.instance.getReceivingStream(planId);
if (future == null)
@@ -117,7 +117,7 @@ public final class StreamResultFuture extends
AbstractFuture<StreamState>
// The main reason we create a StreamResultFuture on the receiving
side is for JMX exposure.
future = new StreamResultFuture(planId, streamOperation,
pendingRepair, previewKind);
- StreamManager.instance.registerReceiving(future);
+ StreamManager.instance.registerFollower(future);
}
future.attachConnection(from, sessionIndex, channel);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from
{} channel.remote {} channel.local {} channel.id {}",
@@ -125,10 +125,10 @@ public final class StreamResultFuture extends
AbstractFuture<StreamState>
return future;
}
- private static StreamResultFuture createAndRegister(UUID planId,
StreamOperation streamOperation, StreamCoordinator coordinator)
+ private static StreamResultFuture createAndRegisterInitiator(UUID planId,
StreamOperation streamOperation, StreamCoordinator coordinator)
{
StreamResultFuture future = new StreamResultFuture(planId,
streamOperation, coordinator);
- StreamManager.instance.register(future);
+ StreamManager.instance.registerInitiator(future);
return future;
}
@@ -141,7 +141,6 @@ public final class StreamResultFuture extends
AbstractFuture<StreamState>
{
StreamSession session = coordinator.getOrCreateSessionById(from,
sessionIndex);
session.init(this);
- session.attach(channel);
}
public void addEventListener(StreamEventHandler listener)
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 05bb5ff..c6ef5f0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -17,18 +17,18 @@
*/
package org.apache.cassandra.streaming;
+import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
-import com.google.common.util.concurrent.Futures;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.RangesAtEndpoint;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,10 +54,10 @@ import static
org.apache.cassandra.net.MessagingService.current_version;
/**
* Handles the streaming a one or more streams to and from a specific remote
node.
- *
+ *<p/>
* Both this node and the remote one will create a similar symmetrical {@link
StreamSession}. A streaming
* session has the following life-cycle:
- *
+ *<pre>
* 1. Session Initialization
*
* (a) A node (the initiator in the following) create a new {@link
StreamSession},
@@ -101,12 +101,15 @@ import static
org.apache.cassandra.net.MessagingService.current_version;
*
* 4. Completion phase
*
- * (a) When a node enters the completion phase, it sends a {@link
CompleteMessage} to the peer, and then enter the
- * {@link StreamSession.State#WAIT_COMPLETE} state. If it has already
received a {@link CompleteMessage}
- * from the peer, session is complete and is then closed ({@link
#closeSession(State)}). Otherwise, the node
- * switch to the {@link StreamSession.State#WAIT_COMPLETE} state and
send a {@link CompleteMessage} to the other side.
+ * (a) When the initiator finishes streaming, it enters the {@link
StreamSession.State#WAIT_COMPLETE} state, and waits
+ * for the follower to send a {@link CompleteMessage} once it finishes
streaming too. Once the {@link CompleteMessage}
+ * is received, initiator sets its own state to {@link
StreamSession.State#COMPLETE} and closes all channels attached
+ * to this session.
+ *
+ * </pre>
*
* In brief, the message passing looks like this (I for initiator, F for
follwer):
+ * <pre>
* (session init)
* I: StreamInitMessage
* (session prepare)
@@ -117,7 +120,8 @@ import static
org.apache.cassandra.net.MessagingService.current_version;
* I: OutgoingStreamMessage
* F: ReceivedMessage
* (completion)
- * I/F: CompleteMessage
+ * F: CompleteMessage
+ *</pre>
*
* All messages which derive from {@link StreamMessage} are sent by the
standard internode messaging
* (via {@link org.apache.cassandra.net.MessagingService}, while the actual
files themselves are sent by a special
@@ -127,6 +131,9 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
{
private static final Logger logger =
LoggerFactory.getLogger(StreamSession.class);
+ // for test purpose to record received message and state transition
+ public volatile static MessageStateSink sink = MessageStateSink.NONE;
+
private final StreamOperation streamOperation;
/**
@@ -153,43 +160,77 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new
HashMap<>();
+ private final boolean isFollower;
private final NettyStreamingMessageSender messageSender;
- private final ConcurrentMap<ChannelId, Channel> incomingChannels = new
ConcurrentHashMap<>();
+ // contains both inbound and outbound channels
+ private final ConcurrentMap<ChannelId, Channel> channels = new
ConcurrentHashMap<>();
+
+ // "maybeCompleted()" should be executed at most once. Because it can be
executed asynchronously by IO
+ // threads(serialization/deserialization) and stream messaging processing
thread, causing connection closed before
+ // receiving peer's CompleteMessage.
+ private boolean maybeCompleted = false;
+ private Future closeFuture;
- private final AtomicBoolean isAborted = new AtomicBoolean(false);
private final UUID pendingRepair;
private final PreviewKind previewKind;
+ /**
+ * State Transition:
+ *
+ * <pre>
+ * +------------------+----------> FAILED <--------------------+
+ * | | ^ |
+ * | | | initiator |
+ * INITIALIZED --> PREPARING --> STREAMING ------------> WAIT_COMPLETE
----> COMPLETED
+ * | | | ^
^
+ * | | | follower |
|
+ * | |
+-------------------------)-----------------+
+ * | | |
|
+ * | | if preview |
|
+ * | +----------------------------------------+
|
+ * | nothing to request or to transfer
|
+ *
+-----------------------------------------------------------------------------+
+ * nothing to request or to transfer
+ *
+ * </pre>
+ */
public enum State
{
- INITIALIZED,
- PREPARING,
- STREAMING,
- WAIT_COMPLETE,
- COMPLETE,
- FAILED,
+ INITIALIZED(false),
+ PREPARING(false),
+ STREAMING(false),
+ WAIT_COMPLETE(false),
+ COMPLETE(true),
+ FAILED(true);
+
+ private final boolean finalState;
+
+ State(boolean finalState)
+ {
+ this.finalState = finalState;
+ }
+
+ /**
+ * @return true if current state is final, either COMPLETE OR FAILED.
+ */
+ public boolean isFinalState()
+ {
+ return finalState;
+ }
}
private volatile State state = State.INITIALIZED;
- private volatile boolean completeSent = false;
/**
* Create new streaming session with the peer.
*/
public StreamSession(StreamOperation streamOperation, InetAddressAndPort
peer, StreamConnectionFactory factory,
- int index, UUID pendingRepair, PreviewKind
previewKind)
- {
- this(streamOperation, new OutboundConnectionSettings(peer), factory,
index, pendingRepair, previewKind);
- }
- /**
- * Create new streaming session with the peer.
- */
- public StreamSession(StreamOperation streamOperation,
OutboundConnectionSettings template, StreamConnectionFactory factory,
- int index, UUID pendingRepair, PreviewKind
previewKind)
+ boolean isFollower, int index, UUID pendingRepair,
PreviewKind previewKind)
{
this.streamOperation = streamOperation;
- this.peer = template.to;
- this.template = template;
+ this.peer = peer;
+ this.template = new OutboundConnectionSettings(peer);
+ this.isFollower = isFollower;
this.index = index;
this.messageSender = new NettyStreamingMessageSender(this, template,
factory, current_version, previewKind.isPreview());
@@ -197,7 +238,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
this.pendingRepair = pendingRepair;
this.previewKind = previewKind;
- logger.debug("Creating stream session to {}", template);
+ logger.debug("Creating stream session to {} as {}", template,
isFollower ? "follower" : "initiator");
}
public UUID planId()
@@ -253,11 +294,46 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
StreamHook.instance.reportStreamFuture(this, streamResult);
}
- public boolean attach(Channel channel)
+ /**
+ * Attach a channel to this session upon receiving the first inbound
message.
+ *
+ * @param channel The channel to attach.
+ * @param isControlChannel If the channel is the one to send control
messages to.
+ * @return False if the channel was already attached, true otherwise.
+ */
+ public synchronized boolean attachInbound(Channel channel, boolean
isControlChannel)
{
- if (!messageSender.hasControlChannel())
+ failIfFinished();
+
+ if (!messageSender.hasControlChannel() && isControlChannel)
messageSender.injectControlMessageChannel(channel);
- return incomingChannels.putIfAbsent(channel.id(), channel) == null;
+
+ channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+ return channels.putIfAbsent(channel.id(), channel) == null;
+ }
+
+ /**
+ * Attach a channel to this session upon sending the first outbound
message.
+ *
+ * @param channel The channel to attach.
+ * @return False if the channel was already attached, true otherwise.
+ */
+ public synchronized boolean attachOutbound(Channel channel)
+ {
+ failIfFinished();
+
+ channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+ return channels.putIfAbsent(channel.id(), channel) == null;
+ }
+
+ /**
+ * On channel closing, if no channels are left just close the message
sender; this must be closed last to ensure
+ * keep alive messages are sent until the very end of the streaming
session.
+ */
+ private synchronized void onChannelClose(Channel channel)
+ {
+ if (channels.remove(channel.id()) != null && channels.isEmpty())
+ messageSender.close();
}
/**
@@ -339,7 +415,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
private void failIfFinished()
{
- if (state() == State.COMPLETE || state() == State.FAILED)
+ if (state().isFinalState())
throw new RuntimeException(String.format("Stream %s is finished
with state %s", planId(), state().name()));
}
@@ -399,22 +475,32 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
private synchronized Future closeSession(State finalState)
{
- Future abortedTasksFuture = null;
- if (isAborted.compareAndSet(false, true))
- {
- state(finalState);
+ // it's session is already closed
+ if (closeFuture != null)
+ return closeFuture;
- // ensure aborting the tasks do not happen on the network IO
thread (read: netty event loop)
- // as we don't want any blocking disk IO to stop the network thread
- if (finalState == State.FAILED)
- abortedTasksFuture =
ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
+ state(finalState);
- incomingChannels.values().stream().map(channel -> channel.close());
- messageSender.close();
+ List<Future> futures = new ArrayList<>();
- streamResult.handleSessionComplete(this);
+ // ensure aborting the tasks do not happen on the network IO thread
(read: netty event loop)
+ // as we don't want any blocking disk IO to stop the network thread
+ if (finalState == State.FAILED)
+
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
+
+ // Channels should only be closed by the initiator; but, if this
session closed
+ // due to failure, channels should be always closed regardless, even
if this is not the initator.
+ if (!isFollower || state != State.COMPLETE)
+ {
+ logger.debug("[Stream #{}] Will close attached channels {}",
planId(), channels);
+ channels.values().forEach(channel -> futures.add(channel.close()));
}
- return abortedTasksFuture != null ? abortedTasksFuture :
Futures.immediateFuture(null);
+
+ sink.onClose(peer);
+ streamResult.handleSessionComplete(this);
+ closeFuture = FBUtilities.allOf(futures);
+
+ return closeFuture;
}
private void abortTasks()
@@ -426,7 +512,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
}
catch (Exception e)
{
- logger.warn("failed to abort some streaming tasks", e);
+ logger.warn("[Stream #{}] failed to abort some streaming tasks",
planId(), e);
}
}
@@ -437,6 +523,10 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
*/
public void state(State newState)
{
+ if (logger.isTraceEnabled())
+ logger.trace("[Stream #{}] Changing session state from {} to {}",
planId(), state, newState);
+
+ sink.recordState(peer, newState);
state = newState;
}
@@ -463,21 +553,29 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
return state == State.COMPLETE;
}
- public void messageReceived(StreamMessage message)
+ public synchronized void messageReceived(StreamMessage message)
{
+ if (message.type != StreamMessage.Type.KEEP_ALIVE)
+ failIfFinished();
+
+ sink.recordMessage(peer, message.type);
+
switch (message.type)
{
case STREAM_INIT:
- // nop
+ // at follower, nop
break;
case PREPARE_SYN:
+ // at follower
PrepareSynMessage msg = (PrepareSynMessage) message;
prepare(msg.requests, msg.summaries);
break;
case PREPARE_SYNACK:
+ // at initiator
prepareSynAck((PrepareSynAckMessage) message);
break;
case PREPARE_ACK:
+ // at follower
prepareAck((PrepareAckMessage) message);
break;
case STREAM:
@@ -488,6 +586,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
received(received.tableId, received.sequenceNumber);
break;
case COMPLETE:
+ // at initiator
complete();
break;
case KEEP_ALIVE:
@@ -529,10 +628,32 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
}
/**
- * Call back for handling exception during streaming.
+ * Signal an error to this stream session: if it's an EOF exception, it
tries to understand if the socket was closed
+ * after completion or because the peer was down, otherwise sends a {@link
SessionFailedMessage} and closes
+ * the session as {@link State#FAILED}.
*/
- public Future onError(Throwable e)
+ public synchronized Future onError(Throwable e)
{
+ boolean isEofException = e instanceof EOFException;
+ if (isEofException)
+ {
+ if (state.finalState)
+ {
+ logger.debug("[Stream #{}] Socket closed after session
completed with state {}", planId(), state);
+
+ return null;
+ }
+ else
+ {
+ logger.error("[Stream #{}] Socket closed before session
completion, peer {} is probably down.",
+ planId(),
+ peer.address.getHostAddress(),
+ e);
+
+ return closeSession(State.FAILED);
+ }
+ }
+
logError(e);
// send session failure message
if (messageSender.connected())
@@ -577,7 +698,6 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
*/
private void prepareAsync(Collection<StreamRequest> requests,
Collection<StreamSummary> summaries)
{
-
for (StreamRequest request : requests)
addTransferRanges(request.keyspace,
RangesAtEndpoint.concat(request.full, request.transientReplicas),
request.columnFamilies, true); // always flush on stream request
for (StreamSummary summary : summaries)
@@ -589,9 +709,12 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
prepareSynAck.summaries.add(task.getSummary());
messageSender.sendMessage(prepareSynAck);
-
streamResult.handleSessionPrepared(this);
- maybeCompleted();
+
+ if (isPreview())
+ completePreview();
+ else
+ maybeCompleted();
}
private void prepareSynAck(PrepareSynAckMessage msg)
@@ -602,7 +725,8 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
prepareReceiving(summary);
// only send the (final) ACK if we are expecting the peer to send
this node (the initiator) some files
- messageSender.sendMessage(new PrepareAckMessage());
+ if (!isPreview())
+ messageSender.sendMessage(new PrepareAckMessage());
}
if (isPreview())
@@ -614,9 +738,8 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
private void prepareAck(PrepareAckMessage msg)
{
if (isPreview())
- completePreview();
- else
- startStreamingFiles(true);
+ throw new RuntimeException(String.format("[Stream #%s] Cannot
receive PrepareAckMessage for preview session", planId()));
+ startStreamingFiles(true);
}
/**
@@ -646,7 +769,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
{
if (isPreview())
{
- throw new RuntimeException("Cannot receive files for preview
session");
+ throw new RuntimeException(String.format("[Stream #%s] Cannot
receive files for preview session", planId()));
}
long headerSize = message.stream.getSize();
@@ -674,20 +797,49 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
*/
public synchronized void complete()
{
- logger.debug("handling Complete message, state = {}, completeSent =
{}", state, completeSent);
- if (state == State.WAIT_COMPLETE)
+ logger.debug("[Stream #{}] handling Complete message, state = {}",
planId(), state);
+
+ if (!isFollower)
{
- if (!completeSent)
- {
- messageSender.sendMessage(new CompleteMessage());
- completeSent = true;
- }
- closeSession(State.COMPLETE);
+ if (state == State.WAIT_COMPLETE)
+ closeSession(State.COMPLETE);
+ else
+ state(State.WAIT_COMPLETE);
}
else
{
- state(State.WAIT_COMPLETE);
+ // pre-4.0 nodes should not be connected via streaming, see {@link
MessagingService#accept_streaming}
+ throw new IllegalStateException(String.format("[Stream #%s]
Complete message can be only received by the initiator!", planId()));
+ }
+ }
+
+ /**
+ * Synchronize both {@link #complete()} and {@link #maybeCompleted()} to
avoid racing
+ */
+ private synchronized boolean maybeCompleted()
+ {
+ if (!(receivers.isEmpty() && transfers.isEmpty()))
+ return false;
+
+ // if already executed once, skip it
+ if (maybeCompleted)
+ return true;
+
+ maybeCompleted = true;
+ if (!isFollower)
+ {
+ if (state == State.WAIT_COMPLETE)
+ closeSession(State.COMPLETE);
+ else
+ state(State.WAIT_COMPLETE);
+ }
+ else
+ {
+ messageSender.sendMessage(new CompleteMessage());
+ closeSession(State.COMPLETE);
}
+
+ return true;
}
/**
@@ -760,31 +912,6 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
}
}
- private boolean maybeCompleted()
- {
- boolean completed = receivers.isEmpty() && transfers.isEmpty();
- if (completed)
- {
- if (state == State.WAIT_COMPLETE)
- {
- if (!completeSent)
- {
- messageSender.sendMessage(new CompleteMessage());
- completeSent = true;
- }
- closeSession(State.COMPLETE);
- }
- else
- {
- // notify peer that this session is completed
- messageSender.sendMessage(new CompleteMessage());
- completeSent = true;
- state(State.WAIT_COMPLETE);
- }
- }
- return completed;
- }
-
/**
* Flushes matching column families from the given keyspace, or all
columnFamilies
* if the cf list is empty.
@@ -843,4 +970,43 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
{
return transferredRangesPerKeyspace.size();
}
+
+ @VisibleForTesting
+ public static interface MessageStateSink
+ {
+ static final MessageStateSink NONE = new MessageStateSink() {
+ @Override
+ public void recordState(InetAddressAndPort from, State state)
+ {
+ }
+
+ @Override
+ public void recordMessage(InetAddressAndPort from,
StreamMessage.Type message)
+ {
+ }
+
+ @Override
+ public void onClose(InetAddressAndPort from)
+ {
+ }
+ };
+
+ /**
+ * @param from peer that is connected in the stream session
+ * @param state new state to change to
+ */
+ public void recordState(InetAddressAndPort from, StreamSession.State
state);
+
+ /**
+ * @param from peer that sends the given message
+ * @param message stream message sent by peer
+ */
+ public void recordMessage(InetAddressAndPort from, StreamMessage.Type
message);
+
+ /**
+ *
+ * @param from peer that is being disconnected
+ */
+ public void onClose(InetAddressAndPort from);
+ }
}
diff --git
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 1314e1d..fba56f5 100644
---
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -21,9 +21,8 @@ package org.apache.cassandra.streaming.async;
import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.nio.channels.ClosedByInterruptException;
import java.util.Collection;
-import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -33,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +61,7 @@ import
org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
/**
* Responsible for sending {@link StreamMessage}s to a given peer. We manage
an array of netty {@link Channel}s
@@ -102,7 +103,7 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
* A special {@link Channel} for sending non-stream streaming messages,
basically anything that isn't an
* {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but
a node doesn't send that, it's only received).
*/
- private Channel controlMessageChannel;
+ private volatile Channel controlMessageChannel;
// note: this really doesn't need to be a LBQ, just something that's
thread safe
private final Collection<ScheduledFuture<?>> channelKeepAlives = new
LinkedBlockingQueue<>();
@@ -153,6 +154,9 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
return controlMessageChannel != null;
}
+ /**
+ * Used by follower to setup control message channel created by initiator
+ */
public void injectControlMessageChannel(Channel channel)
{
this.controlMessageChannel = channel;
@@ -160,11 +164,20 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
scheduleKeepAliveTask(channel);
}
+ /**
+ * Used by initiator to setup control message channel connecting to
follower
+ */
private void setupControlMessageChannel() throws IOException
{
if (controlMessageChannel == null)
{
- controlMessageChannel = createChannel();
+ /*
+ * Inbound handlers are needed:
+ * a) for initiator's control channel(the first outbound channel)
to receive follower's message.
+ * b) for streaming receiver (note: both initiator and follower
can receive streaming files) to reveive files,
+ * in {@link Handler#setupStreamingPipeline}
+ */
+ controlMessageChannel = createChannel(true);
scheduleKeepAliveTask(controlMessageChannel);
}
}
@@ -181,11 +194,16 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
task.future = scheduledFuture;
}
- private Channel createChannel() throws IOException
+ private Channel createChannel(boolean isInboundHandlerNeeded) throws
IOException
{
Channel channel = factory.createConnection(template, streamingVersion);
- ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast("stream", new StreamingInboundHandler(template.to,
streamingVersion, session));
+ session.attachOutbound(channel);
+
+ if (isInboundHandlerNeeded)
+ {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("stream", new
StreamingInboundHandler(template.to, streamingVersion, session));
+ }
channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
logger.debug("Creating channel id {} local {} remote {}",
channel.id(), channel.localAddress(), channel.remoteAddress());
return channel;
@@ -316,9 +334,10 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
return;
+ Channel channel = null;
try
{
- Channel channel = getOrCreateChannel();
+ channel = getOrCreateChannel();
if (!channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet(false,
true))
throw new IllegalStateException("channel's transferring
state is currently set to true. refusing to start new stream");
@@ -336,6 +355,19 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
{
session.onError(e);
}
+ catch (Throwable t)
+ {
+ if (closed && Throwables.getRootCause(t) instanceof
ClosedByInterruptException && fileTransferExecutor.isShutdown())
+ {
+ logger.debug("{} Streaming channel was closed due to the
executor pool being shutdown", createLogTag(session, channel));
+ }
+ else
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!session.state().isFinalState())
+ session.onError(t);
+ }
+ }
finally
{
fileTransferSemaphore.release();
@@ -383,7 +415,7 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
if (channel != null)
return channel;
- channel = createChannel();
+ channel = createChannel(false);
threadToChannelMap.put(currentThread, channel);
return channel;
}
@@ -513,6 +545,9 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
@Override
public void close()
{
+ if (closed)
+ return;
+
closed = true;
if (logger.isDebugEnabled())
logger.debug("{} Closing stream connection channels on {}",
createLogTag(session, null), template.to);
@@ -520,14 +555,7 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
future.cancel(false);
channelKeepAlives.clear();
- List<Future<Void>> futures = new
ArrayList<>(threadToChannelMap.size());
- for (Channel channel : threadToChannelMap.values())
- futures.add(channel.close());
- FBUtilities.waitOnFutures(futures, 10, TimeUnit.SECONDS);
threadToChannelMap.clear();
fileTransferExecutor.shutdownNow();
-
- if (controlMessageChannel != null)
- controlMessageChannel.close();
}
}
diff --git
a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index a319fea..edc74e3 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -18,12 +18,9 @@
package org.apache.cassandra.streaming.async;
-import java.io.EOFException;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -31,7 +28,6 @@ import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +40,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
-import org.apache.cassandra.net.AsyncStreamingInputPlus.InputTimeoutException;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveException;
import org.apache.cassandra.streaming.StreamResultFuture;
@@ -183,7 +178,7 @@ public class StreamingInboundHandler extends
ChannelInboundHandlerAdapter
Uninterruptibles.sleepUninterruptibly(400,
TimeUnit.MILLISECONDS);
}
- StreamMessage message = StreamMessage.deserialize(buffers,
protocolVersion, null);
+ StreamMessage message = StreamMessage.deserialize(buffers,
protocolVersion);
// keep-alives don't necessarily need to be tied to a
session (they could be arrive before or after
// wrt session lifecycle, due to races), just log that we
received the message and carry on
@@ -203,10 +198,6 @@ public class StreamingInboundHandler extends
ChannelInboundHandlerAdapter
session.messageReceived(message);
}
}
- catch (InputTimeoutException | EOFException e)
- {
- // ignore
- }
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
@@ -248,7 +239,7 @@ public class StreamingInboundHandler extends
ChannelInboundHandlerAdapter
{
assert session == null : "initiator of stream session received
a StreamInitMessage";
StreamInitMessage init = (StreamInitMessage) message;
- StreamResultFuture.initReceivingSide(init.sessionIndex,
init.planId, init.streamOperation, init.from, channel, init.pendingRepair,
init.previewKind);
+ StreamResultFuture.createFollower(init.sessionIndex,
init.planId, init.streamOperation, init.from, channel, init.pendingRepair,
init.previewKind);
streamSession = sessionProvider.apply(new
SessionIdentifier(init.from, init.planId, init.sessionIndex));
}
else if (message instanceof IncomingStreamMessage)
@@ -262,7 +253,9 @@ public class StreamingInboundHandler extends
ChannelInboundHandlerAdapter
if (streamSession == null)
throw new IllegalStateException(createLogTag(null, channel) +
" no session found for message " + message);
- streamSession.attach(channel);
+ // Attach this channel to the session: this only happens upon
receiving the first init message as a follower;
+ // in all other cases, no new control channel will be added, as
the proper control channel will be already attached.
+ streamSession.attachInbound(channel, message instanceof
StreamInitMessage);
return streamSession;
}
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index 81e16f7..83d95e0 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -25,7 +25,7 @@ public class CompleteMessage extends StreamMessage
{
public static Serializer<CompleteMessage> serializer = new
Serializer<CompleteMessage>()
{
- public CompleteMessage deserialize(DataInputPlus in, int version,
StreamSession session)
+ public CompleteMessage deserialize(DataInputPlus in, int version)
{
return new CompleteMessage();
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index e17c3ab..5f69a90 100644
---
a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++
b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -35,10 +35,10 @@ public class IncomingStreamMessage extends StreamMessage
public static Serializer<IncomingStreamMessage> serializer = new
Serializer<IncomingStreamMessage>()
{
@SuppressWarnings("resource")
- public IncomingStreamMessage deserialize(DataInputPlus input, int
version, StreamSession session) throws IOException
+ public IncomingStreamMessage deserialize(DataInputPlus input, int
version) throws IOException
{
StreamMessageHeader header =
StreamMessageHeader.serializer.deserialize(input, version);
- session = StreamManager.instance.findSession(header.sender,
header.planId, header.sessionIndex);
+ StreamSession session =
StreamManager.instance.findSession(header.sender, header.planId,
header.sessionIndex);
if (session == null)
throw new IllegalStateException(String.format("unknown stream
session: %s - %d", header.planId, header.sessionIndex));
ColumnFamilyStore cfs =
ColumnFamilyStore.getIfExists(header.tableId);
diff --git
a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index f80c617..5352b3b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -28,7 +28,7 @@ public class KeepAliveMessage extends StreamMessage
{
public static Serializer<KeepAliveMessage> serializer = new
Serializer<KeepAliveMessage>()
{
- public KeepAliveMessage deserialize(DataInputPlus in, int version,
StreamSession session) throws IOException
+ public KeepAliveMessage deserialize(DataInputPlus in, int version)
throws IOException
{
return new KeepAliveMessage();
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
index 263aabd..8406f80 100644
---
a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
+++
b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -32,7 +32,7 @@ public class OutgoingStreamMessage extends StreamMessage
{
public static Serializer<OutgoingStreamMessage> serializer = new
Serializer<OutgoingStreamMessage>()
{
- public OutgoingStreamMessage deserialize(DataInputPlus in, int
version, StreamSession session)
+ public OutgoingStreamMessage deserialize(DataInputPlus in, int version)
{
throw new UnsupportedOperationException("Not allowed to call
deserialize on an outgoing stream");
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
index f43ff01..97fdff7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
@@ -33,7 +33,7 @@ public class PrepareAckMessage extends StreamMessage
//nop
}
- public PrepareAckMessage deserialize(DataInputPlus in, int version,
StreamSession session) throws IOException
+ public PrepareAckMessage deserialize(DataInputPlus in, int version)
throws IOException
{
return new PrepareAckMessage();
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
index 2d8026c..4e5e8fb 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
@@ -38,7 +38,7 @@ public class PrepareSynAckMessage extends StreamMessage
StreamSummary.serializer.serialize(summary, out, version);
}
- public PrepareSynAckMessage deserialize(DataInputPlus input, int
version, StreamSession session) throws IOException
+ public PrepareSynAckMessage deserialize(DataInputPlus input, int
version) throws IOException
{
PrepareSynAckMessage message = new PrepareSynAckMessage();
int numSummaries = input.readInt();
diff --git
a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
index 6fbaafa..e378af7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
@@ -31,7 +31,7 @@ public class PrepareSynMessage extends StreamMessage
{
public static Serializer<PrepareSynMessage> serializer = new
Serializer<PrepareSynMessage>()
{
- public PrepareSynMessage deserialize(DataInputPlus input, int version,
StreamSession session) throws IOException
+ public PrepareSynMessage deserialize(DataInputPlus input, int version)
throws IOException
{
PrepareSynMessage message = new PrepareSynMessage();
// requests
diff --git
a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 3988dcc..ff2cdec 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -29,7 +29,7 @@ public class ReceivedMessage extends StreamMessage
public static Serializer<ReceivedMessage> serializer = new
Serializer<ReceivedMessage>()
{
@SuppressWarnings("resource") // Not closing constructed
DataInputPlus's as the channel needs to remain open.
- public ReceivedMessage deserialize(DataInputPlus input, int version,
StreamSession session) throws IOException
+ public ReceivedMessage deserialize(DataInputPlus input, int version)
throws IOException
{
return new ReceivedMessage(TableId.deserialize(input),
input.readInt());
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index 59ad90e..ca10bcc 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -25,7 +25,7 @@ public class SessionFailedMessage extends StreamMessage
{
public static Serializer<SessionFailedMessage> serializer = new
Serializer<SessionFailedMessage>()
{
- public SessionFailedMessage deserialize(DataInputPlus in, int version,
StreamSession session)
+ public SessionFailedMessage deserialize(DataInputPlus in, int version)
{
return new SessionFailedMessage();
}
diff --git
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index e148790..953f2c4 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -86,7 +86,7 @@ public class StreamInitMessage extends StreamMessage
out.writeInt(message.previewKind.getSerializationVal());
}
- public StreamInitMessage deserialize(DataInputPlus in, int version,
StreamSession session) throws IOException
+ public StreamInitMessage deserialize(DataInputPlus in, int version)
throws IOException
{
InetAddressAndPort from =
inetAddressAndPortSerializer.deserialize(in, version);
int sessionIndex = in.readInt();
diff --git
a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 72e180c..2f42f1b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -43,16 +43,16 @@ public abstract class StreamMessage
return 1 + message.type.outSerializer.serializedSize(message, version);
}
- public static StreamMessage deserialize(DataInputPlus in, int version,
StreamSession session) throws IOException
+ public static StreamMessage deserialize(DataInputPlus in, int version)
throws IOException
{
Type type = Type.lookupById(in.readByte());
- return type.inSerializer.deserialize(in, version, session);
+ return type.inSerializer.deserialize(in, version);
}
/** StreamMessage serializer */
public static interface Serializer<V extends StreamMessage>
{
- V deserialize(DataInputPlus in, int version, StreamSession session)
throws IOException;
+ V deserialize(DataInputPlus in, int version) throws IOException;
void serialize(V message, DataOutputStreamPlus out, int version,
StreamSession session) throws IOException;
long serializedSize(V message, int version) throws IOException;
}
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 1df84ab..115cd43 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -486,6 +486,81 @@ public class FBUtilities
Uninterruptibles.sleepUninterruptibly(delay,
TimeUnit.MILLISECONDS);
}
}
+
+ /**
+ * Returns a new {@link Future} wrapping the given list of futures and
returning a list of their results.
+ */
+ public static Future<List> allOf(Collection<Future> futures)
+ {
+ if (futures.isEmpty())
+ return CompletableFuture.completedFuture(null);
+
+ return new Future<List>()
+ {
+ @Override
+ @SuppressWarnings("unchecked")
+ public List get() throws InterruptedException, ExecutionException
+ {
+ List result = new ArrayList<>(futures.size());
+ for (Future current : futures)
+ {
+ result.add(current.get());
+ }
+ return result;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ List result = new ArrayList<>(futures.size());
+ long deadline = System.nanoTime() +
TimeUnit.NANOSECONDS.convert(timeout, unit);
+ for (Future current : futures)
+ {
+ long remaining = deadline - System.nanoTime();
+ if (remaining <= 0)
+ throw new TimeoutException();
+
+ result.add(current.get(remaining, TimeUnit.NANOSECONDS));
+ }
+ return result;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ for (Future current : futures)
+ {
+ if (!current.cancel(mayInterruptIfRunning))
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ for (Future current : futures)
+ {
+ if (!current.isCancelled())
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ for (Future current : futures)
+ {
+ if (!current.isDone())
+ return false;
+ }
+ return true;
+ }
+ };
+ }
+
/**
* Create a new instance of a partitioner defined in an SSTable Descriptor
* @param desc Descriptor of an sstable
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index bafd03d..956f21e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@ -18,16 +18,39 @@
package org.apache.cassandra.distributed.test;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessage;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.streaming.StreamSession.State.PREPARING;
+import static org.apache.cassandra.streaming.StreamSession.State.STREAMING;
+import static org.apache.cassandra.streaming.StreamSession.State.WAIT_COMPLETE;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_ACK;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYN;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYNACK;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.RECEIVED;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM;
+import static
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM_INIT;
public class StreamingTest extends TestBaseImpl
{
@@ -51,6 +74,9 @@ public class StreamingTest extends TestBaseImpl
Assert.assertEquals(0, results.length);
}
+ // collect message and state
+ registerSink(cluster, nodes);
+
cluster.get(nodes).runOnInstance(() ->
StorageService.instance.rebuild(null, KEYSPACE, null, null));
{
Object[][] results =
cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM
%s.cf;", KEYSPACE));
@@ -72,4 +98,85 @@ public class StreamingTest extends TestBaseImpl
testStreaming(2, 2, 1000, "LeveledCompactionStrategy");
}
+ public static void registerSink(Cluster cluster, int initiatorNodeId)
+ {
+ IInvokableInstance initiatorNode = cluster.get(initiatorNodeId);
+ InetSocketAddress initiator = initiatorNode.broadcastAddress();
+ MessageStateSinkImpl initiatorSink = new MessageStateSinkImpl();
+
+ for (int node = 1; node <= cluster.size(); node++)
+ {
+ if (initiatorNodeId == node)
+ continue;
+
+ IInvokableInstance followerNode = cluster.get(node);
+ InetSocketAddress follower = followerNode.broadcastAddress();
+
+ // verify on initiator's stream session
+ initiatorSink.messages(follower, Arrays.asList(PREPARE_SYNACK,
STREAM, StreamMessage.Type.COMPLETE));
+ initiatorSink.states(follower, Arrays.asList(PREPARING, STREAMING,
WAIT_COMPLETE, StreamSession.State.COMPLETE));
+
+ // verify on follower's stream session
+ MessageStateSinkImpl followerSink = new MessageStateSinkImpl();
+ followerSink.messages(initiator, Arrays.asList(STREAM_INIT,
PREPARE_SYN, PREPARE_ACK, RECEIVED));
+ followerSink.states(initiator, Arrays.asList(PREPARING,
STREAMING, StreamSession.State.COMPLETE));
+ followerNode.runOnInstance(() -> StreamSession.sink =
followerSink);
+ }
+
+ cluster.get(initiatorNodeId).runOnInstance(() -> StreamSession.sink =
initiatorSink);
+ }
+
+ @VisibleForTesting
+ public static class MessageStateSinkImpl implements
StreamSession.MessageStateSink, Serializable
+ {
+ // use enum ordinal instead of enum to walk around inter-jvm class
loader issue, only classes defined in
+ // InstanceClassLoader#sharedClassNames are shareable between server
jvm and test jvm
+ public final Map<InetAddress, Queue<Integer>> messageSink = new
ConcurrentHashMap<>();
+ public final Map<InetAddress, Queue<Integer>> stateTransitions = new
ConcurrentHashMap<>();
+
+ public void messages(InetSocketAddress peer, List<StreamMessage.Type>
messages)
+ {
+ messageSink.put(peer.getAddress(),
messages.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new)));
+ }
+
+ public void states(InetSocketAddress peer, List<StreamSession.State>
states)
+ {
+ stateTransitions.put(peer.getAddress(),
states.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new)));
+ }
+
+ @Override
+ public void recordState(InetAddressAndPort from, StreamSession.State
state)
+ {
+ Queue<Integer> states = stateTransitions.get(from.address);
+ if (states.peek() == null)
+ Assert.fail("Unexpected state " + state);
+
+ int expected = states.poll();
+ Assert.assertEquals(StreamSession.State.values()[expected], state);
+ }
+
+ @Override
+ public void recordMessage(InetAddressAndPort from, StreamMessage.Type
message)
+ {
+ if (message == StreamMessage.Type.KEEP_ALIVE)
+ return;
+
+ Queue<Integer> messages = messageSink.get(from.address);
+ if (messages.peek() == null)
+ Assert.fail("Unexpected message " + message);
+
+ int expected = messages.poll();
+ Assert.assertEquals(StreamMessage.Type.values()[expected],
message);
+ }
+
+ @Override
+ public void onClose(InetAddressAndPort from)
+ {
+ Queue<Integer> states = stateTransitions.get(from.address);
+ Assert.assertTrue("Missing states: " + states, states.isEmpty());
+
+ Queue<Integer> messages = messageSink.get(from.address);
+ Assert.assertTrue("Missing messages: " + messages,
messages.isEmpty());
+ }
+ }
}
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 2b642a8..744750e 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -213,8 +213,8 @@ public class ZeroCopyStreamingBenchmark
private StreamSession setupStreamingSessionForTest()
{
- StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(),
false, null, PreviewKind.NONE);
- StreamResultFuture future =
StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP,
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(),
false, false, null, PreviewKind.NONE);
+ StreamResultFuture future =
StreamResultFuture.createInitiator(UUID.randomUUID(),
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(),
streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED));
diff --git
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index c722738..b8115f4 100644
---
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -196,8 +196,8 @@ public class CassandraEntireSSTableStreamWriterTest
private StreamSession setupStreamingSessionForTest()
{
- StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(),
false, null, PreviewKind.NONE);
- StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(),
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(),
streamCoordinator);
+ StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(),
false, false, null, PreviewKind.NONE);
+ StreamResultFuture future =
StreamResultFuture.createInitiator(UUID.randomUUID(),
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(),
streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED));
diff --git
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index eb15e9a..ae3ff92 100644
---
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -99,6 +99,7 @@ public class CassandraStreamManagerTest
return new StreamSession(StreamOperation.REPAIR,
InetAddressAndPort.getByName("127.0.0.1"),
connectionFactory,
+ false,
0,
pendingRepair,
PreviewKind.NONE);
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index 61adb58..b18d249 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -55,7 +55,7 @@ public class StreamStateStoreTest
Range<Token> range = new Range<>(factory.fromString("0"),
factory.fromString("100"));
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP,
local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
+ StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP,
local, new DefaultConnectionFactory(), false, 0, null, PreviewKind.NONE);
session.addStreamRequest("keyspace1",
RangesAtEndpoint.toDummyList(Collections.singleton(range)),
RangesAtEndpoint.toDummyList(Collections.emptyList()),
Collections.singleton("cf"));
StreamStateStore store = new StreamStateStore();
@@ -76,7 +76,7 @@ public class StreamStateStoreTest
// add different range within the same keyspace
Range<Token> range2 = new Range<>(factory.fromString("100"),
factory.fromString("200"));
- session = new StreamSession(StreamOperation.BOOTSTRAP, local, new
DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
+ session = new StreamSession(StreamOperation.BOOTSTRAP, local, new
DefaultConnectionFactory(), false, 0, null, PreviewKind.NONE);
session.addStreamRequest("keyspace1",
RangesAtEndpoint.toDummyList(Collections.singleton(range2)),
RangesAtEndpoint.toDummyList(Collections.emptyList()),
Collections.singleton("cf"));
session.state(StreamSession.State.COMPLETE);
store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
diff --git
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index a57fcbc..262a200 100644
---
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -187,13 +187,14 @@ public class EntireSSTableStreamingCorrectFilesCountTest
1,
new
DefaultConnectionFactory(),
false,
+ false,
null,
PreviewKind.NONE);
- StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(),
-
StreamOperation.BOOTSTRAP,
-
Collections.singleton(streamEventHandler),
- streamCoordinator);
+ StreamResultFuture future =
StreamResultFuture.createInitiator(UUID.randomUUID(),
+
StreamOperation.BOOTSTRAP,
+
Collections.singleton(streamEventHandler),
+
streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
streamCoordinator.addSessionInfo(new SessionInfo(peer,
diff --git
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 2f4feff..0bf7f20 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -77,7 +77,7 @@ public class StreamTransferTaskTest
public void testScheduleTimeout() throws Exception
{
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP,
peer, (template, messagingVersion) -> new EmbeddedChannel(), 0,
UUID.randomUUID(), PreviewKind.ALL);
+ StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP,
peer, (template, messagingVersion) -> new EmbeddedChannel(), false, 0,
UUID.randomUUID(), PreviewKind.ALL);
ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
@@ -88,6 +88,7 @@ public class StreamTransferTaskTest
}
// create streaming task that streams those two sstables
+ session.state(StreamSession.State.PREPARING);
StreamTransferTask task = new StreamTransferTask(session,
cfs.metadata.id);
for (SSTableReader sstable : cfs.getLiveSSTables())
{
@@ -98,6 +99,7 @@ public class StreamTransferTaskTest
assertEquals(14, task.getTotalNumberOfFiles());
// if file sending completes before timeout then the task should be
canceled.
+ session.state(StreamSession.State.STREAMING);
Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS);
f.get();
@@ -123,9 +125,9 @@ public class StreamTransferTaskTest
public void testFailSessionDuringTransferShouldNotReleaseReferences()
throws Exception
{
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(),
false, null, PreviewKind.NONE);
- StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(),
StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(),
streamCoordinator);
- StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP,
peer, null, 0, null, PreviewKind.NONE);
+ StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(),
false, false, null, PreviewKind.NONE);
+ StreamResultFuture future =
StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.OTHER,
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP,
peer, null, false, 0, null, PreviewKind.NONE);
session.init(future);
ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
diff --git
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
index 957869b..76bfa76 100644
---
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
+++
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -63,9 +63,11 @@ public class NettyStreamingMessageSenderTest
channel = new TestChannel(Integer.MAX_VALUE);
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
UUID pendingRepair = UUID.randomUUID();
- session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR,
(template, messagingVersion) -> null, 0, pendingRepair, PreviewKind.ALL);
- StreamResultFuture future = StreamResultFuture.initReceivingSide(0,
UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair,
session.getPreviewKind());
+ session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR,
(template, messagingVersion) -> null, true, 0, pendingRepair, PreviewKind.ALL);
+ StreamResultFuture future = StreamResultFuture.createFollower(0,
UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair,
session.getPreviewKind());
session.init(future);
+ session.attachOutbound(channel);
+
sender = session.getMessageSender();
sender.setControlMessageChannel(channel);
}
diff --git
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index 6a2afe8..11f6757 100644
---
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -127,7 +127,7 @@ public class StreamingInboundHandlerTest
private StreamSession createSession(SessionIdentifier sid)
{
- return new StreamSession(StreamOperation.BOOTSTRAP, sid.from,
(template, messagingVersion) -> null, sid.sessionIndex, UUID.randomUUID(),
PreviewKind.ALL);
+ return new StreamSession(StreamOperation.BOOTSTRAP, sid.from,
(template, messagingVersion) -> null, true, sid.sessionIndex,
UUID.randomUUID(), PreviewKind.ALL);
}
@Test (expected = IllegalStateException.class)
@@ -152,8 +152,8 @@ public class StreamingInboundHandlerTest
public void StreamDeserializingTask_deriveSession_IFM_HasSession()
{
UUID planId = UUID.randomUUID();
- StreamResultFuture future = StreamResultFuture.initReceivingSide(0,
planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(),
PreviewKind.ALL);
- StreamManager.instance.register(future);
+ StreamResultFuture future = StreamResultFuture.createFollower(0,
planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(),
PreviewKind.ALL);
+ StreamManager.instance.registerFollower(future);
StreamMessageHeader header = new
StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0,
0, 0,
UUID.randomUUID());
IncomingStreamMessage msg = new IncomingStreamMessage(null, header);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]