Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9c91eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9c91eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9c91eb
Branch: refs/heads/trunk
Commit: 8b9c91ebabc3d43cec6c7a6b0f04c0c4158fca8f
Parents: a384faa d6c37bd
Author: Sam Tunnicliffe <[email protected]>
Authored: Thu Jun 25 09:59:45 2015 +0100
Committer: Sam Tunnicliffe <[email protected]>
Committed: Thu Jun 25 09:59:45 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../format/big/SSTableNamesIterator.java | 3 --
.../cassandra/cql3/LargeCompactValueTest.java | 50 ++++++++++++++++++++
3 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9c91eb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7a32693,e8ac3e7..59f1c18
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
-2.1.7
+2.2
+ * Update cqlsh for UDFs (CASSANDRA-7556)
+ * Change Windows kernel default timer resolution (CASSANDRA-9634)
+ * Deprected sstable2json and json2sstable (CASSANDRA-9618)
+ * Allow native functions in user-defined aggregates (CASSANDRA-9542)
+ * Don't repair system_distributed by default (CASSANDRA-9621)
+ * Fix mixing min, max, and count aggregates for blob type (CASSANRA-9622)
+ * Rename class for DATE type in Java driver (CASSANDRA-9563)
+ * Duplicate compilation of UDFs on coordinator (CASSANDRA-9475)
+ * Fix connection leak in CqlRecordWriter (CASSANDRA-9576)
+ * Mlockall before opening system sstables & remove boot_without_jna option
(CASSANDRA-9573)
+ * Add functions to convert timeuuid to date or time, deprecate dateOf and
unixTimestampOf (CASSANDRA-9229)
+ * Make sure we cancel non-compacting sstables from LifecycleTransaction
(CASSANDRA-9566)
+ * Fix deprecated repair JMX API (CASSANDRA-9570)
+ * Add logback metrics (CASSANDRA-9378)
+ * Update and refactor ant test/test-compression to run the tests in parallel
(CASSANDRA-9583)
+Merged from 2.1:
+ * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
* ColumnFamilyStore.selectAndReference may block during compaction
(CASSANDRA-9637)
* Fix bug in cardinality check when compacting (CASSANDRA-9580)
* Fix memory leak in Ref due to ConcurrentLinkedQueue.remove() behaviour
(CASSANDRA-9549)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9c91eb/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
index 7c9a344,0000000..b8910c7
mode 100644,000000..100644
---
a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++
b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@@ -1,267 -1,0 +1,264 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements
OnDiskAtomIterator
+{
+ private ColumnFamily cf;
+ private final SSTableReader sstable;
+ private FileDataInput fileToClose;
+ private Iterator<OnDiskAtom> iter;
+ public final SortedSet<CellName> columns;
+ public final DecoratedKey key;
+
+ public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key,
SortedSet<CellName> columns)
+ {
+ assert columns != null;
+ this.sstable = sstable;
+ this.columns = columns;
+ this.key = key;
+
+ RowIndexEntry indexEntry = sstable.getPosition(key,
SSTableReader.Operator.EQ);
+ if (indexEntry == null)
+ return;
+
+ try
+ {
+ read(sstable, null, indexEntry);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ finally
+ {
+ if (fileToClose != null)
+ FileUtils.closeQuietly(fileToClose);
+ }
+ }
+
+ public SSTableNamesIterator(SSTableReader sstable, FileDataInput file,
DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
+ {
+ assert columns != null;
+ this.sstable = sstable;
+ this.columns = columns;
+ this.key = key;
+
+ try
+ {
+ read(sstable, file, indexEntry);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+
+ private FileDataInput createFileDataInput(long position)
+ {
+ fileToClose = sstable.getFileDataInput(position);
+ return fileToClose;
+ }
+
+ @SuppressWarnings("resource")
+ private void read(SSTableReader sstable, FileDataInput file,
RowIndexEntry indexEntry)
+ throws IOException
+ {
+ List<IndexHelper.IndexInfo> indexList;
+
+ // If the entry is not indexed or the index is not promoted, read
from the row start
+ if (!indexEntry.isIndexed())
+ {
+ if (file == null)
+ file = createFileDataInput(indexEntry.position);
+ else
+ file.seek(indexEntry.position);
+
+ DecoratedKey keyInDisk =
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+ assert keyInDisk.equals(key) : String.format("%s != %s in %s",
keyInDisk, key, file.getPath());
+ }
+
+ indexList = indexEntry.columnsIndex();
+
+ if (!indexEntry.isIndexed())
+ {
+ ColumnFamilySerializer serializer = ColumnFamily.serializer;
+ try
+ {
+ cf =
ArrayBackedSortedColumns.factory.create(sstable.metadata);
+ cf.delete(DeletionTime.serializer.deserialize(file));
+ }
+ catch (Exception e)
+ {
+ throw new IOException(serializer + " failed to deserialize "
+ sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " +
file, e);
+ }
+ }
+ else
+ {
+ cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+ cf.delete(indexEntry.deletionTime());
+ }
+
+ List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
+ if (indexList.isEmpty())
+ {
+ readSimpleColumns(file, columns, result);
+ }
+ else
+ {
+ readIndexedColumns(sstable.metadata, file, columns, indexList,
indexEntry.position, result);
+ }
+
+ // create an iterator view of the columns we read
+ iter = result.iterator();
+ }
+
+ private void readSimpleColumns(FileDataInput file, SortedSet<CellName>
columnNames, List<OnDiskAtom> result)
+ {
+ Iterator<OnDiskAtom> atomIterator =
cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
+ int n = 0;
+ while (atomIterator.hasNext())
+ {
+ OnDiskAtom column = atomIterator.next();
+ if (column instanceof Cell)
+ {
+ if (columnNames.contains(column.name()))
+ {
+ result.add(column);
+ if (++n >= columns.size())
+ break;
+ }
+ }
+ else
+ {
+ result.add(column);
+ }
+ }
+ }
+
+ @SuppressWarnings("resource")
+ private void readIndexedColumns(CFMetaData metadata,
+ FileDataInput file,
+ SortedSet<CellName> columnNames,
+ List<IndexHelper.IndexInfo> indexList,
+ long basePosition,
+ List<OnDiskAtom> result)
+ throws IOException
+ {
+ /* get the various column ranges we have to read */
+ CellNameType comparator = metadata.comparator;
+ List<IndexHelper.IndexInfo> ranges = new
ArrayList<IndexHelper.IndexInfo>();
+ int lastIndexIdx = -1;
+ for (CellName name : columnNames)
+ {
+ int index = IndexHelper.indexFor(name, indexList, comparator,
false, lastIndexIdx);
+ if (index < 0 || index == indexList.size())
+ continue;
+ IndexHelper.IndexInfo indexInfo = indexList.get(index);
+ // Check the index block does contain the column names and that
we haven't inserted this block yet.
+ if (comparator.compare(name, indexInfo.firstName) < 0 || index ==
lastIndexIdx)
+ continue;
+
+ ranges.add(indexInfo);
+ lastIndexIdx = index;
+ }
+
+ if (ranges.isEmpty())
+ return;
+
+ Iterator<CellName> toFetch = columnNames.iterator();
+ CellName nextToFetch = toFetch.next();
+ for (IndexHelper.IndexInfo indexInfo : ranges)
+ {
+ long positionToSeek = basePosition + indexInfo.offset;
+
+ // With new promoted indexes, our first seek in the data file
will happen at that point.
+ if (file == null)
+ file = createFileDataInput(positionToSeek);
+
+ AtomDeserializer deserializer =
cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+ file.seek(positionToSeek);
+ FileMark mark = file.mark();
+ while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch
!= null)
+ {
+ int cmp = deserializer.compareNextTo(nextToFetch);
+ if (cmp < 0)
+ {
+ // If it's a rangeTombstone, then we need to read it and
include
+ // it if it includes our target. Otherwise, we can skip
it.
+ if (deserializer.nextIsRangeTombstone())
+ {
+ RangeTombstone rt =
(RangeTombstone)deserializer.readNext();
+ if (comparator.compare(rt.max, nextToFetch) >= 0)
+ result.add(rt);
+ }
+ else
+ {
+ deserializer.skipNext();
+ }
+ }
+ else if (cmp == 0)
+ {
+ nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+ result.add(deserializer.readNext());
+ }
+ else
- {
- deserializer.skipNext();
+ nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
- }
+ }
+ }
+ }
+
+ public DecoratedKey getKey()
+ {
+ return key;
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return cf;
+ }
+
+ protected OnDiskAtom computeNext()
+ {
+ if (iter == null || !iter.hasNext())
+ return endOfData();
+ return iter.next();
+ }
+
+ public void close() throws IOException { }
+}