Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/826ae9c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/826ae9c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/826ae9c9

Branch: refs/heads/cassandra-3.11
Commit: 826ae9c91e11ebb889b3f1788b9357c2c717f9a0
Parents: 809f3b3 a7cb009
Author: Aleksey Yeschenko <[email protected]>
Authored: Tue Aug 29 12:30:40 2017 +0100
Committer: Aleksey Yeschenko <[email protected]>
Committed: Tue Aug 29 12:31:27 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  7 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 ++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++--------
 .../db/transform/FilteredPartitions.java        | 18 +++++--
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 51 ++++++++++++++------
 .../apache/cassandra/db/ReadCommandTest.java    | 23 ++++-----
 9 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b0dbd60,6609b05..c4aee3a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -3.0.15
 +3.11.1
 + * Fix cassandra-stress hang issues when an error during cluster connection 
happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range 
movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation 
(CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Fix AssertionError in short read protection (CASSANDRA-13747)
   * Don't skip corrupted sstables on startup (CASSANDRA-13620)
   * Fix the merging of cells with different user type versions 
(CASSANDRA-13776)
   * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index fc225e8,4e0ac1b..778c71d
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@@ -78,31 -77,6 +78,24 @@@ public abstract class UnfilteredPartiti
          return Transformation.apply(toReturn, new Close());
      }
  
 +    public static UnfilteredPartitionIterator concat(final 
List<UnfilteredPartitionIterator> iterators)
 +    {
 +        if (iterators.size() == 1)
 +            return iterators.get(0);
 +
 +        class Extend implements MorePartitions<UnfilteredPartitionIterator>
 +        {
 +            int i = 1;
 +            public UnfilteredPartitionIterator moreContents()
 +            {
 +                if (i >= iterators.size())
 +                    return null;
 +                return iterators.get(i++);
 +            }
 +        }
 +        return MorePartitions.extend(iterators.get(0), new Extend());
 +    }
 +
- 
-     public static PartitionIterator 
mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, 
MergeListener listener)
-     {
-         // TODO: we could have a somewhat faster version if we were to merge 
the UnfilteredRowIterators directly as RowIterators
-         return filter(merge(iterators, nowInSec, listener), nowInSec);
-     }
- 
      public static PartitionIterator filter(final UnfilteredPartitionIterator 
iterator, final int nowInSec)
      {
          return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 7a84eca,fe88a63..b0f667c
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,8 +167,42 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new 
LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new 
LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = 
Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if 
(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : 
cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
+     public final Meter shortReadProtectionRequests;
+ 
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
       * stores metrics that will be rolled into a single global metric

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 116dadd,72c4950..32b6d79
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,18 -27,13 +27,13 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
- import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.*;
  import org.apache.cassandra.db.*;
--import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
--import org.apache.cassandra.db.filter.DataLimits;
++import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.filter.DataLimits.Counter;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.transform.MoreRows;
- import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.db.transform.*;
  import org.apache.cassandra.exceptions.ReadTimeoutException;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.tracing.Tracing;
@@@ -104,10 -118,10 +120,10 @@@ public class DataResolver extends Respo
          if (!command.limits().isUnlimited())
          {
              for (int i = 0; i < results.size(); i++)
 -                results.set(i, Transformation.apply(results.get(i), new 
ShortReadProtection(sources[i], resultCounter)));
 +                results.set(i, Transformation.apply(results.get(i), new 
ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
          }
  
-         return UnfilteredPartitionIterators.mergeAndFilter(results, 
command.nowInSec(), listener);
+         return UnfilteredPartitionIterators.merge(results, 
command.nowInSec(), listener);
      }
  
      private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
@@@ -526,9 -526,9 +542,9 @@@
                  // we should request m rows so that m * x/n = n-x, that is m 
= (n^2/x) - n.
                  // Also note that it's ok if we retrieve more results that 
necessary since our top level iterator is a
                  // counting iterator.
 -                int n = postReconciliationCounter.countedInCurrentPartition();
 -                int x = counter.countedInCurrentPartition();
 +                int n = countedInCurrentPartition(postReconciliationCounter);
 +                int x = countedInCurrentPartition(counter);
-                 int toQuery = Math.max(((n * n) / x) - n, 1);
+                 int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
  
                  DataLimits retryLimits = 
command.limits().forShortReadRetry(toQuery);
                  ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
@@@ -544,44 -547,12 +563,44 @@@
                  return doShortReadRetry(cmd);
              }
  
 +            /**
 +             * Returns the number of results counted by the counter.
 +             *
 +             * @param counter the counter.
 +             * @return the number of results counted by the counter
 +             */
 +            private int counted(Counter counter)
 +            {
 +                // We are interested by the number of rows but for GROUP BY 
queries 'counted' returns the number of
 +                // groups.
 +                if (command.limits().isGroupByLimit())
 +                    return counter.rowCounted();
 +
 +                return counter.counted();
 +            }
 +
 +            /**
 +             * Returns the number of results counted in the partition by the 
counter.
 +             *
 +             * @param counter the counter.
 +             * @return the number of results counted in the partition by the 
counter
 +             */
 +            private int countedInCurrentPartition(Counter counter)
 +            {
 +                // We are interested by the number of rows but for GROUP BY 
queries 'countedInCurrentPartition' returns
 +                // the number of groups in the current partition.
 +                if (command.limits().isGroupByLimit())
 +                    return counter.rowCountedInCurrentPartition();
 +
 +                return counter.countedInCurrentPartition();
 +            }
 +
              private UnfilteredRowIterator 
doShortReadRetry(SinglePartitionReadCommand retryCommand)
              {
 -                DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1);
 -                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
 +                DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
 +                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), 
queryStartNanoTime);
                  if (StorageProxy.canDoLocalRequest(source))
-                       
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
+                     
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
                  else
                      
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
 source, handler);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 2aef2a7,0000000..9264297
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@@ -1,311 -1,0 +1,312 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.partitions.FilteredPartition;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class ReadCommandTest
 +{
 +    private static final String KEYSPACE = "ReadCommandTest";
 +    private static final String CF1 = "Standard1";
 +    private static final String CF2 = "Standard2";
 +    private static final String CF3 = "Standard3";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        DatabaseDescriptor.daemonInitialization();
 +
 +        CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
 +
 +        CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2)
 +                                                         
.addPartitionKey("key", BytesType.instance)
 +                                                         
.addClusteringColumn("col", AsciiType.instance)
 +                                                         
.addRegularColumn("a", AsciiType.instance)
 +                                                         
.addRegularColumn("b", AsciiType.instance).build();
 +
 +        CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3)
 +                                                 .addPartitionKey("key", 
BytesType.instance)
 +                                                 .addClusteringColumn("col", 
AsciiType.instance)
 +                                                 .addRegularColumn("a", 
AsciiType.instance)
 +                                                 .addRegularColumn("b", 
AsciiType.instance)
 +                                                 .addRegularColumn("c", 
AsciiType.instance)
 +                                                 .addRegularColumn("d", 
AsciiType.instance)
 +                                                 .addRegularColumn("e", 
AsciiType.instance)
 +                                                 .addRegularColumn("f", 
AsciiType.instance).build();
 +
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE,
 +                                    KeyspaceParams.simple(1),
 +                                    metadata1,
 +                                    metadata2,
 +                                    metadata3);
 +    }
 +
 +    @Test
 +    public void testPartitionRangeAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
 +                .clustering("Column1")
 +                .add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2"))
 +                .clustering("Column1")
 +                .add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs).build();
 +        assertEquals(2, Util.getAll(readCommand).size());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionSliceAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 +
 +        cfs.truncateBlocking();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("cc")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("dd")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
 +
 +        List<FilteredPartition> partitions = Util.getAll(readCommand);
 +        assertEquals(1, partitions.size());
 +        assertEquals(2, partitions.get(0).rowCount());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionNamesAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 +
 +        cfs.truncateBlocking();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("cc")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("dd")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs, 
Util.dk("key")).includeRow("cc").includeRow("dd").build();
 +
 +        List<FilteredPartition> partitions = Util.getAll(readCommand);
 +        assertEquals(1, partitions.size());
 +        assertEquals(2, partitions.get(0).rowCount());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionGroupMerge() throws Exception
 +    {
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
 +
 +        String[][][] groups = new String[][][] {
 +            new String[][] {
 +                new String[] { "1", "key1", "aa", "a" }, // "1" indicates to 
create the data, "-1" to delete the row
 +                new String[] { "1", "key2", "bb", "b" },
 +                new String[] { "1", "key3", "cc", "c" }
 +            },
 +            new String[][] {
 +                new String[] { "1", "key3", "dd", "d" },
 +                new String[] { "1", "key2", "ee", "e" },
 +                new String[] { "1", "key1", "ff", "f" }
 +            },
 +            new String[][] {
 +                new String[] { "1", "key6", "aa", "a" },
 +                new String[] { "1", "key5", "bb", "b" },
 +                new String[] { "1", "key4", "cc", "c" }
 +            },
 +            new String[][] {
 +                new String[] { "-1", "key6", "aa", "a" },
 +                new String[] { "-1", "key2", "bb", "b" }
 +            }
 +        };
 +
 +        // Given the data above, when the keys are sorted and the deletions 
removed, we should
 +        // get these clustering rows in this order
 +        String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", 
"cc", "bb"};
 +
 +        List<ByteBuffer> buffers = new ArrayList<>(groups.length);
 +        int nowInSeconds = FBUtilities.nowInSeconds();
 +        ColumnFilter columnFilter = 
ColumnFilter.allColumnsBuilder(cfs.metadata).build();
 +        RowFilter rowFilter = RowFilter.create();
 +        Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
 +        ClusteringIndexSliceFilter sliceFilter = new 
ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
 +
 +        for (String[][] group : groups)
 +        {
 +            cfs.truncateBlocking();
 +
 +            List<SinglePartitionReadCommand> commands = new 
ArrayList<>(group.length);
 +
 +            for (String[] data : group)
 +            {
 +                if (data[0].equals("1"))
 +                {
 +                    new RowUpdateBuilder(cfs.metadata, 0, 
ByteBufferUtil.bytes(data[1]))
 +                    .clustering(data[2])
 +                    .add(data[3], ByteBufferUtil.bytes("blah"))
 +                    .build()
 +                    .apply();
 +                }
 +                else
 +                {
 +                    RowUpdateBuilder.deleteRow(cfs.metadata, 
FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
 +                }
 +                commands.add(SinglePartitionReadCommand.create(cfs.metadata, 
nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), 
sliceFilter));
 +            }
 +
 +            cfs.forceBlockingFlush();
 +
 +            ReadQuery query = new SinglePartitionReadCommand.Group(commands, 
DataLimits.NONE);
 +
 +            try (ReadExecutionController executionController = 
query.executionController();
 +                 UnfilteredPartitionIterator iter = 
query.executeLocally(executionController);
 +                 DataOutputBuffer buffer = new DataOutputBuffer())
 +            {
 +                
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
 +                                                                              
  columnFilter,
 +                                                                              
  buffer,
 +                                                                              
  MessagingService.current_version);
 +                buffers.add(buffer.buffer());
 +            }
 +        }
 +
 +        // deserialize, merge and check the results are all there
 +        List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
 +
 +        for (ByteBuffer buffer : buffers)
 +        {
 +            try (DataInputBuffer in = new DataInputBuffer(buffer, true))
 +            {
 +                
iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
 +                                                                              
                  MessagingService.current_version,
 +                                                                              
                  cfs.metadata,
 +                                                                              
                  columnFilter,
 +                                                                              
                  SerializationHelper.Flag.LOCAL));
 +            }
 +        }
 +
-         try(PartitionIterator partitionIterator = 
UnfilteredPartitionIterators.mergeAndFilter(iterators,
-                                                                               
            nowInSeconds,
-                                                                               
            new UnfilteredPartitionIterators.MergeListener()
-         {
-             public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
++        UnfilteredPartitionIterators.MergeListener listener =
++            new UnfilteredPartitionIterators.MergeListener()
 +            {
-                 return null;
-             }
++                public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
++                {
++                    return null;
++                }
 +
-             public void close()
-             {
++                public void close()
++                {
 +
-             }
-         }))
++                }
++            };
++
++        try (PartitionIterator partitionIterator = 
UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators,
 nowInSeconds, listener), nowInSeconds))
 +        {
 +
 +            int i = 0;
 +            int numPartitions = 0;
 +            while (partitionIterator.hasNext())
 +            {
 +                numPartitions++;
 +                try(RowIterator rowIterator = partitionIterator.next())
 +                {
 +                    while (rowIterator.hasNext())
 +                    {
 +                        Row row = rowIterator.next();
 +                        assertEquals("col=" + expectedRows[i++], 
row.clustering().toString(cfs.metadata));
 +                        //System.out.print(row.toString(cfs.metadata, true));
 +                    }
 +                }
 +            }
 +
 +            assertEquals(5, numPartitions);
 +            assertEquals(expectedRows.length, i);
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to