Merge branch 'cassandra-3.0' into cassandra-3.3
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59560005 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59560005 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59560005 Branch: refs/heads/cassandra-3.3 Commit: 5956000507ce41e969373c3e8f4d84107b4212dd Parents: 58c7061 3e37b4a Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Jan 27 15:49:45 2016 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Jan 27 15:49:45 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/LegacyLayout.java | 53 ++++++++- .../org/apache/cassandra/db/ReadCommand.java | 54 +++++---- .../cassandra/db/ReadCommandVerbHandler.java | 2 +- .../org/apache/cassandra/db/ReadResponse.java | 110 ++++++++----------- .../apache/cassandra/db/filter/DataLimits.java | 16 ++- .../db/partitions/PartitionUpdate.java | 4 +- .../UnfilteredPartitionIterators.java | 6 +- .../db/rows/UnfilteredRowIterators.java | 6 +- .../org/apache/cassandra/repair/Validator.java | 2 +- .../apache/cassandra/service/DataResolver.java | 6 +- .../cassandra/service/DigestResolver.java | 6 +- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/cache/CacheProviderTest.java | 4 +- .../org/apache/cassandra/db/PartitionTest.java | 18 +-- .../apache/cassandra/db/ReadResponseTest.java | 10 +- .../db/SinglePartitionSliceCommandTest.java | 14 +-- .../rows/DigestBackwardCompatibilityTest.java | 5 +- .../cassandra/service/DataResolverTest.java | 2 +- 19 files changed, 191 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 89749f8,8daeb2d..c78dd10 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,6 -1,5 +1,7 @@@ -3.0.3 +3.3 + * Avoid bootstrap hanging when existing nodes have no data to stream (CASSANDRA-11010) +Merged from 3.0: + * Fix DISTINCT queries in mixed version clusters (CASSANDRA-10762) * Migrate build status for indexes along with legacy schema (CASSANDRA-11046) * Ensure SSTables for legacy KEYS indexes can be read (CASSANDRA-11045) * Added support for IBM zSystems architecture (CASSANDRA-11054) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index b2fb876,9cde8dc..42cd309 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@@ -41,24 -41,15 +41,24 @@@ public class ReadCommandVerbHandler imp } ReadCommand command = message.payload; + command.setMonitoringTime(message.constructionTime, message.getTimeout()); + ReadResponse response; - try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup)) + try (ReadExecutionController executionController = command.executionController(); + UnfilteredPartitionIterator iterator = command.executeLocally(executionController)) { - response = command.createResponse(iterator, command.columnFilter()); + response = command.createResponse(iterator); } - MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer()); + if (!command.complete()) + { + Tracing.trace("Discarding partial response to {} (timed out)", message.from); + MessagingService.instance().incrementDroppedMessages(message, System.currentTimeMillis() - message.constructionTime.timestamp); + return; + } Tracing.trace("Enqueuing response to {}", message.from); + MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer); MessagingService.instance().sendReply(reply, id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 27da486,8fa2082..b84128e --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -1780,25 -1777,10 +1780,25 @@@ public class StorageProxy implements St { try { - try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup)) + command.setMonitoringTime(new ConstructionTime(constructionTime), timeout); + + ReadResponse response; + try (ReadExecutionController executionController = command.executionController(); + UnfilteredPartitionIterator iterator = command.executeLocally(executionController)) + { - response = command.createResponse(iterator, command.columnFilter()); ++ response = command.createResponse(iterator); + } + + if (command.complete()) { - handler.response(command.createResponse(iterator)); + handler.response(response); } + else + { + MessagingService.instance().incrementDroppedMessages(verb, System.currentTimeMillis() - constructionTime); + handler.onFailure(FBUtilities.getBroadcastAddress()); + } + MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } catch (Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 749ce2e,9af6028..1e4f696 --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@@ -110,11 -110,11 +110,11 @@@ public class SinglePartitionSliceComman cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21); logger.debug("ReadCommand: {}", cmd); - UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadExecutionController.empty()); - ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd.columnFilter()); + ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd); logger.debug("creating response: {}", response); - partitionIterator = response.makeIterator(cfm, null); // <- cmd is null + partitionIterator = response.makeIterator(cmd); assert partitionIterator.hasNext(); UnfilteredRowIterator partition = partitionIterator.next(); @@@ -166,9 -166,9 +166,9 @@@ ReadResponse dst; // check (de)serialized iterator for memtable static cell - try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) + try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) { - response = ReadResponse.createDataResponse(pi, cmd.columnFilter()); + response = ReadResponse.createDataResponse(pi, cmd); } out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); @@@ -182,9 -182,9 +182,9 @@@ // check (de)serialized iterator for sstable static cell Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush(); - try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) + try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) { - response = ReadResponse.createDataResponse(pi, cmd.columnFilter()); + response = ReadResponse.createDataResponse(pi, cmd); } out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59560005/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/DataResolverTest.java index b94db67,997f4e4..b753362 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@@ -712,11 -712,10 +712,11 @@@ public class DataResolverTes public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd) { return MessageIn.create(from, - ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()), + ReadResponse.createRemoteDataResponse(partitionIterator, cmd), Collections.EMPTY_MAP, MessagingService.Verb.REQUEST_RESPONSE, - MessagingService.current_version); + MessagingService.current_version, + MessageIn.createTimestamp()); } private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)