http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java index bf926e9,0000000..2f00687 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java +++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java @@@ -1,171 -1,0 +1,177 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.*; +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.ByteBufferUtil; + +public class ByteBufferDataInput extends AbstractDataInput implements FileDataInput, DataInput +{ + private final ByteBuffer buffer; + private final String filename; + private final long segmentOffset; + private int position; + + public ByteBufferDataInput(ByteBuffer buffer, String filename, long segmentOffset, int position) + { + assert buffer != null; + this.buffer = buffer; + this.filename = filename; + this.segmentOffset = segmentOffset; + this.position = position; + } + + // Only use when we know the seek in within the mapped segment. Throws an + // IOException otherwise. + public void seek(long pos) throws IOException + { + long inSegmentPos = pos - segmentOffset; - if (inSegmentPos < 0 || inSegmentPos > buffer.capacity()) ++ if (!contains(pos)) + throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity())); + + position = (int) inSegmentPos; + } + ++ public boolean contains(long pos) ++ { ++ long inSegmentPos = pos - segmentOffset; ++ return inSegmentPos >= 0 && inSegmentPos < buffer.capacity(); ++ } ++ + public long getFilePointer() + { + return segmentOffset + position; + } + + public long getPosition() + { + return segmentOffset + position; + } + + public long getPositionLimit() + { + return segmentOffset + buffer.capacity(); + } + + @Override + public boolean markSupported() + { + return false; + } + + public void reset(FileMark mark) throws IOException + { + assert mark instanceof MappedFileDataInputMark; + position = ((MappedFileDataInputMark) mark).position; + } + + public FileMark mark() + { + return new MappedFileDataInputMark(position); + } + + public long bytesPastMark(FileMark mark) + { + assert mark instanceof MappedFileDataInputMark; + assert position >= ((MappedFileDataInputMark) mark).position; + return position - ((MappedFileDataInputMark) mark).position; + } + + public boolean isEOF() throws IOException + { + return position == buffer.capacity(); + } + + public long bytesRemaining() throws IOException + { + return buffer.capacity() - position; + } + + public String getPath() + { + return filename; + } + + public int read() throws IOException + { + if (isEOF()) + return -1; + return buffer.get(position++) & 0xFF; + } + + /** + * Does the same thing as <code>readFully</code> do but without copying data (thread safe) + * @param length length of the bytes to read + * @return buffer with portion of file content + * @throws IOException on any fail of I/O operation + */ + public ByteBuffer readBytes(int length) throws IOException + { + int remaining = buffer.remaining() - position; + if (length > remaining) + throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested", + remaining, length)); + + if (length == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + + ByteBuffer bytes = buffer.duplicate(); + bytes.position(buffer.position() + position).limit(buffer.position() + position + length); + position += length; + + // we have to copy the data in case we unreference the underlying sstable. See CASSANDRA-3179 + ByteBuffer clone = ByteBuffer.allocate(bytes.remaining()); + clone.put(bytes); + clone.flip(); + return clone; + } + + @Override + public final void readFully(byte[] bytes) throws IOException + { + ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, 0, bytes.length); + position += bytes.length; + } + + @Override + public final void readFully(byte[] bytes, int offset, int count) throws IOException + { + ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, offset, count); + position += count; + } + + private static class MappedFileDataInputMark implements FileMark + { + int position; + + MappedFileDataInputMark(int position) + { + this.position = position; + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + + "filename='" + filename + "'" + + ", position=" + position + + ")"; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index a8fae9f,623f65a..808b5ad --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@@ -27,6 -28,13 +28,12 @@@ import com.google.common.annotations.Vi import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.io.FSReadError; + import org.apache.cassandra.io.sstable.Component; + import org.apache.cassandra.io.sstable.Descriptor; + import org.apache.cassandra.io.sstable.IndexSummary; + import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JVMStabilityInspector; public class MmappedSegmentedFile extends SegmentedFile @@@ -135,6 -141,73 +142,74 @@@ } } + // see CASSANDRA-10357 + public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + boolean mayNeedRepair = false; + if (ibuilder instanceof Builder) + mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX)); + if (dbuilder instanceof Builder) + mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA)); + + if (mayNeedRepair) + forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder); + return mayNeedRepair; + } + + // if one of the index/data files have boundaries larger than we can mmap, and they were written by a version that did not guarantee correct boundaries were saved, + // rebuild the boundaries and save them again + private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + if (ibuilder instanceof Builder) + ((Builder) ibuilder).boundaries.clear(); + if (dbuilder instanceof Builder) + ((Builder) dbuilder).boundaries.clear(); + ++ RowIndexEntry.IndexSerializer rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata); + try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");) + { + long iprev = 0, dprev = 0; + for (int i = 0; i < indexSummary.size(); i++) + { + // first read the position in the summary, and read the corresponding position in the data file + long icur = indexSummary.getPosition(i); + raf.seek(icur); + ByteBufferUtil.readWithShortLength(raf); - RowIndexEntry rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf, descriptor.version); ++ RowIndexEntry rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version); + long dcur = rie.position; + + // if these positions are small enough to map out a segment from the prior version (i.e. less than 2Gb), + // just add these as a boundary and proceed to the next index summary record; most scenarios will be + // served by this, keeping the cost of rebuild to a minimum. + + if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE) + { + // otherwise, loop over its index block, providing each RIE as a potential boundary for both files + raf.seek(iprev); + while (raf.getFilePointer() < icur) + { + // add the position of this record in the index file as an index file boundary + ibuilder.addPotentialBoundary(raf.getFilePointer()); + // then read the RIE, and add its data file position as a boundary for the data file + ByteBufferUtil.readWithShortLength(raf); - rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf, descriptor.version); ++ rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version); + dbuilder.addPotentialBoundary(rie.position); + } + } + + ibuilder.addPotentialBoundary(icur); + dbuilder.addPotentialBoundary(dcur); + + iprev = icur; + dprev = dcur; + } + } + catch (IOException e) + { + logger.error("Failed to recalculate boundaries for {}; mmap access may degrade to buffered for this file", descriptor); + } + } + /** * Overrides the default behaviour to create segments of a maximum size. */ @@@ -153,68 -326,83 +328,58 @@@ public Builder() { super(); - boundaries = new ArrayList<>(); - boundaries.add(0L); } - public void addPotentialBoundary(long boundary) + public long[] boundaries() { - if (boundary - currentStart <= MAX_SEGMENT_SIZE) - { - // boundary fits into current segment: expand it - currentSize = boundary - currentStart; - return; - } + return boundaries.truncate(); + } - // close the current segment to try and make room for the boundary - if (currentSize > 0) - { - currentStart += currentSize; - boundaries.add(currentStart); - } - currentSize = boundary - currentStart; + // indicates if we may need to repair the mmapped file boundaries. this is a cheap check to see if there + // are any spans larger than an mmap segment size, which should be rare to occur in practice. + boolean mayNeedRepair(String path) + { + // old boundaries were created without the length, so add it as a candidate + long length = new File(path).length(); + boundaries.addCandidate(length); + long[] boundaries = this.boundaries.truncate(); - // if we couldn't make room, the boundary needs its own segment - if (currentSize > MAX_SEGMENT_SIZE) + long prev = 0; + for (long boundary : boundaries) { - currentStart = boundary; - boundaries.add(currentStart); - currentSize = 0; + if (boundary - prev > MAX_SEGMENT_SIZE) + return true; + prev = boundary; } + return false; + } + + public void addPotentialBoundary(long boundary) + { + boundaries.addCandidate(boundary); } - public SegmentedFile complete(String path, long overrideLength, boolean isFinal) + public SegmentedFile complete(ChannelProxy channel, long overrideLength) { - assert !isFinal || overrideLength <= 0; - long length = overrideLength > 0 ? overrideLength : new File(path).length(); + long length = overrideLength > 0 ? overrideLength : channel.size(); // create the segments - return new MmappedSegmentedFile(channel, length, createSegments(channel, length)); - return new MmappedSegmentedFile(path, length, createSegments(path, length, isFinal)); -- } - - private Segment[] createSegments(String path, long length, boolean isFinal) - { - RandomAccessFile raf; - try - { - raf = new RandomAccessFile(path, "r"); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - private Segment[] createSegments(ChannelProxy channel, long length) - { - // if we're early finishing a range that doesn't span multiple segments, but the finished file now does, - // we remove these from the end (we loop incase somehow this spans multiple segments, but that would - // be a loco dataset - while (length < boundaries.get(boundaries.size() - 1)) - boundaries.remove(boundaries.size() -1); - - // add a sentinel value == length - List<Long> boundaries = new ArrayList<>(this.boundaries); - if (length != boundaries.get(boundaries.size() - 1)) - boundaries.add(length); - - int segcount = boundaries.size() - 1; - long[] boundaries = this.boundaries.finish(length, isFinal); ++ long[] boundaries = this.boundaries.finish(length, overrideLength <= 0); + + int segcount = boundaries.length - 1; Segment[] segments = new Segment[segcount]; + - try - { - for (int i = 0; i < segcount; i++) - { - long start = boundaries[i]; - long size = boundaries[i + 1] - start; - MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE - ? raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, size) - : null; - segments[i] = new Segment(start, segment); - } - } - catch (IOException e) - { - throw new FSReadError(e, path); - } - finally + for (int i = 0; i < segcount; i++) { - long start = boundaries.get(i); - long size = boundaries.get(i + 1) - start; - FileUtils.closeQuietly(raf); ++ long start = boundaries[i]; ++ long size = boundaries[i + 1] - start; + MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE + ? channel.map(FileChannel.MapMode.READ_ONLY, start, size) + : null; + segments[i] = new Segment(start, segment); } -- return segments; ++ ++ return new MmappedSegmentedFile(channel, length, segments); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/SegmentedFile.java index 66898c6,23454bc..30707d8 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@@ -23,9 -23,9 +23,10 @@@ import java.io.File import java.io.IOException; import java.nio.MappedByteBuffer; import java.util.Iterator; + import java.util.List; import java.util.NoSuchElementException; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.config.Config; http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java ---------------------------------------------------------------------- diff --cc test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java index 0000000,e17c6a7..4913b32 mode 000000,100644..100644 --- a/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java +++ b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java @@@ -1,0 -1,322 +1,324 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.io.sstable; + + import java.io.*; + import java.nio.ByteBuffer; + import java.util.Arrays; + import java.util.Random; + + import com.google.common.io.Files; + import org.junit.AfterClass; + import org.junit.BeforeClass; + import org.junit.Test; + + import junit.framework.Assert; + import org.apache.cassandra.SchemaLoader; + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.dht.ByteOrderedPartitioner; + import org.apache.cassandra.exceptions.InvalidRequestException; ++import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.io.util.DataOutputStreamPlus; + import org.apache.cassandra.io.util.FileUtils; + import org.apache.cassandra.io.util.MmappedSegmentedFile; + import org.apache.cassandra.io.util.MmappedSegmentedFile.Builder.Boundaries; ++import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; + import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.utils.ByteBufferUtil; + + public class LongSegmentedFileBoundaryTest + { + @BeforeClass + public static void setup() throws Exception + { + SchemaLoader.cleanupAndLeaveDirs(); + Keyspace.setInitialized(); + StorageService.instance.initServer(); + } + + @AfterClass + public static void tearDown() + { + Config.setClientMode(false); + } + + @Test + public void testRandomBoundaries() + { + long[] candidates = new long[1 + (1 << 16)]; + int[] indexesToCheck = new int[1 << 8]; + Random random = new Random(); + + for (int run = 0; run < 100; run++) + { + + long seed = random.nextLong(); + random.setSeed(seed); + System.out.println("Seed: " + seed); + + // at least 1Ki, and as many as 256Ki, boundaries + int candidateCount = (1 + random.nextInt(candidates.length >> 10)) << 10; + generateBoundaries(random, candidateCount, candidates, indexesToCheck); + + Boundaries builder = new Boundaries(); + int nextIndexToCheck = indexesToCheck[0]; + int checkCount = 0; + System.out.printf("[0..%d)", candidateCount); + for (int i = 1; i < candidateCount - 1; i++) + { + if (i == nextIndexToCheck) + { + if (checkCount % 20 == 0) + System.out.printf(" %d", i); + // grow number of samples logarithmically; work will still increase superlinearly, as size of dataset grows linearly + int sampleCount = 1 << (31 - Integer.numberOfLeadingZeros(++checkCount)); + checkBoundarySample(random, candidates, i, sampleCount, builder); + // select out next index to check (there may be dups, so skip them) + while ((nextIndexToCheck = checkCount == indexesToCheck.length ? candidateCount : indexesToCheck[checkCount]) == i) + checkCount++; + } + + builder.addCandidate(candidates[i]); + } + System.out.println(); + checkBoundaries(candidates, candidateCount - 1, builder, candidates[candidateCount - 1]); + Assert.assertEquals(candidateCount, nextIndexToCheck); + } + } + + private static void generateBoundaries(Random random, int candidateCount, long[] candidates, int[] indexesToCheck) + { + // average averageBoundarySize is 4MiB, max 4GiB, min 4KiB + long averageBoundarySize = (4L << 10) * random.nextInt(1 << 20); + long prev = 0; + for (int i = 1 ; i < candidateCount ; i++) + candidates[i] = prev += Math.max(1, averageBoundarySize + (random.nextGaussian() * averageBoundarySize)); + + // generate indexes we will corroborate our behaviour on + for (int i = 0 ; i < indexesToCheck.length ; i++) + indexesToCheck[i] = 1 + random.nextInt(candidateCount - 2); + Arrays.sort(indexesToCheck); + } + + private static void checkBoundarySample(Random random, long[] candidates, int candidateCount, int sampleCount, Boundaries builder) + { + for (int i = 0 ; i < sampleCount ; i++) + { + // pick a number exponentially less likely to be near the beginning, since we test that area earlier + int position = 0 ; + while (position <= 0) + position = candidateCount / (Integer.lowestOneBit(random.nextInt())); + long upperBound = candidates[position]; + long lowerBound = random.nextBoolean() ? (rand(random, 0, upperBound) / (Integer.lowestOneBit(random.nextInt()))) + : candidates[Math.max(0, position - random.nextInt(64))]; + long length = rand(random, lowerBound, upperBound); + checkBoundaries(candidates, candidateCount, builder, length); + } + checkBoundaries(candidates, candidateCount, builder, candidates[candidateCount]); + } + + private static long rand(Random random, long lowerBound, long upperBound) + { + if (upperBound == lowerBound) + return upperBound; + return lowerBound + ((random.nextLong() & Long.MAX_VALUE) % (upperBound - lowerBound)); + } + + private static void checkBoundaries(long[] candidates, int candidateCount, Boundaries builder, long length) + { + if (length == 0) + return; + + long[] boundaries = new long[(int) (10 + 2 * (length / Integer.MAX_VALUE))]; + int count = 1; + int prev = 0; + while (true) + { + int p = candidates[prev + 1] - boundaries[count - 1] >= Integer.MAX_VALUE + ? prev + 1 + : Arrays.binarySearch(candidates, prev, candidateCount, boundaries[count - 1] + Integer.MAX_VALUE); + if (p < 0) p = -2 -p; + if (p >= candidateCount - 1 || candidates[p] >= length) + break; + boundaries[count++] = candidates[p]; + if (candidates[p + 1] >= length) + break; + prev = p; + } + if (candidates[candidateCount - 1] < length && length - boundaries[count - 1] >= Integer.MAX_VALUE) + boundaries[count++] = candidates[candidateCount - 1]; + boundaries[count++] = length; + final long[] canon = Arrays.copyOf(boundaries, count); + final long[] check = builder.finish(length, false); + if (!Arrays.equals(canon, check)) + Assert.assertTrue("\n" + Arrays.toString(canon) + "\n" + Arrays.toString(check), Arrays.equals(canon, check)); + } + + @Test + public void testBoundariesAndRepairSmall() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, 1 << 16); + } + + @Test + public void testBoundariesAndRepairMedium() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, 1 << 20); + } + + @Test + public void testBoundariesAndRepairLarge() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, 100 << 20); + } + + @Test + public void testBoundariesAndRepairHuge() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, Integer.MAX_VALUE - 1024); + } + + @Test + public void testBoundariesAndRepairTooHuge() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, Integer.MAX_VALUE); + } + + @Test + public void testBoundariesAndRepairHugeIndex() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1 << 7, 1 << 15); + } + + @Test + public void testBoundariesAndRepairReallyHugeIndex() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1 << 14, 1 << 15); + } + + private void testBoundariesAndRepair(int rows, int rowSize) throws InvalidRequestException, IOException + { + String KS = "cql_keyspace"; + String TABLE = "table1"; + + File tempdir = Files.createTempDir(); + try + { + Assert.assertTrue(DatabaseDescriptor.getColumnIndexSize() < rowSize); + Assert.assertTrue(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap); + Assert.assertTrue(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap); + Assert.assertTrue(StorageService.getPartitioner() instanceof ByteOrderedPartitioner); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + Assert.assertTrue(dataDir.mkdirs()); + + String schema = "CREATE TABLE cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k bigint, v1 blob, v2 blob, v3 blob, v4 blob, v5 blob, PRIMARY KEY (k" + (rows > 1 ? ", v1" : "") + ")) WITH compression = { 'sstable_compression':'' };"; + String insert = "INSERT INTO cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k, v1, v2, v3, v4, v5) VALUES (?, ?, ?, ?, ?, ?)"; + + CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .withPartitioner(StorageService.getPartitioner()) + .using(insert) + .sorted(); + CQLSSTableWriter writer = builder.build(); + + // write 8Gb of decorated keys + ByteBuffer[] value = new ByteBuffer[rows]; + for (int row = 0 ; row < rows ; row++) + { + // if we're using clustering columns, the clustering key is replicated across every other column + value[row] = ByteBuffer.allocate(rowSize / (rows > 1 ? 8 : 5)); + value[row].putInt(0, row); + } + long targetSize = 8L << 30; + long dk = 0; + long size = 0; + long dkSize = rowSize * rows; + while (size < targetSize) + { + for (int row = 0 ; row < rows ; row++) + writer.addRow(dk, value[row], value[row], value[row], value[row], value[row]); + size += dkSize; + dk++; + } + + Descriptor descriptor = writer.getCurrentDescriptor().asType(Descriptor.Type.FINAL); + writer.close(); + + // open (and close) the reader so that the summary file is created + SSTableReader reader = SSTableReader.open(descriptor); + reader.selfRef().release(); + + // then check the boundaries are reasonable, and corrupt them + checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE); + + // then check that reopening corrects the corruption + reader = SSTableReader.open(descriptor); + reader.selfRef().release(); + checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE); + } + finally + { + FileUtils.deleteRecursive(tempdir); + } + } + + private static void checkThenCorruptBoundaries(Descriptor descriptor, boolean expectDataMmappable) throws IOException + { + File summaryFile = new File(descriptor.filenameFor(Component.SUMMARY)); + DataInputStream iStream = new DataInputStream(new FileInputStream(summaryFile)); + IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, StorageService.getPartitioner(), true, CFMetaData.DEFAULT_MIN_INDEX_INTERVAL, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL); + ByteBuffer first = ByteBufferUtil.readWithLength(iStream); + ByteBuffer last = ByteBufferUtil.readWithLength(iStream); + MmappedSegmentedFile.Builder ibuilder = new MmappedSegmentedFile.Builder(); + MmappedSegmentedFile.Builder dbuilder = new MmappedSegmentedFile.Builder(); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + iStream.close(); + // index file cannot generally be non-mmappable, as index entries cannot be larger than MAX_SEGMENT_SIZE (due to promotedSize being encoded as an int) + assertBoundaries(descriptor.filenameFor(Component.PRIMARY_INDEX), true, ibuilder.boundaries()); + assertBoundaries(descriptor.filenameFor(Component.DATA), expectDataMmappable, dbuilder.boundaries()); + - DataOutputStreamPlus oStream = new DataOutputStreamPlus(new FileOutputStream(summaryFile)); ++ DataOutputStreamPlus oStream = new WrappedDataOutputStreamPlus(new FileOutputStream(summaryFile)); + IndexSummary.serializer.serialize(indexSummary, oStream, true); + ByteBufferUtil.writeWithLength(first, oStream); + ByteBufferUtil.writeWithLength(last, oStream); + oStream.writeInt(1); + oStream.writeLong(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length()); + oStream.writeLong(new File(descriptor.filenameFor(Component.DATA)).length()); + oStream.close(); + } + + private static void assertBoundaries(String path, boolean expectMmappable, long[] boundaries) + { + long length = new File(path).length(); + long prev = boundaries[0]; + for (int i = 1 ; i <= boundaries.length && prev < length ; i++) + { + long boundary = i == boundaries.length ? length : boundaries[i]; + Assert.assertEquals(String.format("[%d, %d), %d of %d", boundary, prev, i, boundaries.length), + expectMmappable, boundary - prev <= Integer.MAX_VALUE); + prev = boundary; + } + } + + }
