Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5cc02dd9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5cc02dd9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5cc02dd9

Branch: refs/heads/cassandra-3.1
Commit: 5cc02dd9ac4e9f081540586769e82e1544532e1e
Parents: 63a9f9b ae64cc0
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 17 19:31:38 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 17 19:31:38 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressionMetadata.java        | 30 +++++++++++
 .../streaming/messages/FileMessageHeader.java   | 55 ++++++++++++++++++--
 .../streaming/messages/IncomingFileMessage.java |  2 +-
 .../streaming/messages/OutgoingFileMessage.java | 14 ++---
 .../compression/CompressedInputStreamTest.java  |  7 +++
 6 files changed, 93 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b68cf0d,572afc2..4510462
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -9,7 -3,17 +9,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Create compression chunk for sending file only (CASSANDRA-10680)
 - * Make buffered read size configurable (CASSANDRA-10249)
   * Forbid compact clustering column type changes in ALTER TABLE 
(CASSANDRA-8879)
   * Reject incremental repair with subrange repair (CASSANDRA-10422)
   * Add a nodetool command to refresh size_estimates (CASSANDRA-9579)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 123b983,e9a727f..e1e13b7
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -52,10 -49,15 +52,16 @@@ public class FileMessageHeade
      public final SSTableFormat.Type format;
      public final long estimatedKeys;
      public final List<Pair<Long, Long>> sections;
+     /**
+      * Compression info for SSTable to send. Can be null if SSTable is not 
compressed.
+      * On sender, this field is always null to avoid holding large number of 
Chunks.
+      * Use compressionMetadata instead.
+      */
      public final CompressionInfo compressionInfo;
+     private final CompressionMetadata compressionMetadata;
      public final long repairedAt;
      public final int sstableLevel;
 +    public final SerializationHeader.Component header;
  
      public FileMessageHeader(UUID cfId,
                               int sequenceNumber,
@@@ -75,11 -76,38 +81,41 @@@
          this.estimatedKeys = estimatedKeys;
          this.sections = sections;
          this.compressionInfo = compressionInfo;
+         this.compressionMetadata = null;
+         this.repairedAt = repairedAt;
+         this.sstableLevel = sstableLevel;
++        this.header = header;
+     }
+ 
+     public FileMessageHeader(UUID cfId,
+                              int sequenceNumber,
 -                             String version,
++                             Version version,
+                              SSTableFormat.Type format,
+                              long estimatedKeys,
+                              List<Pair<Long, Long>> sections,
+                              CompressionMetadata compressionMetadata,
+                              long repairedAt,
 -                             int sstableLevel)
++                             int sstableLevel,
++                             SerializationHeader.Component header)
+     {
+         this.cfId = cfId;
+         this.sequenceNumber = sequenceNumber;
+         this.version = version;
+         this.format = format;
+         this.estimatedKeys = estimatedKeys;
+         this.sections = sections;
+         this.compressionInfo = null;
+         this.compressionMetadata = compressionMetadata;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
      }
  
+     public boolean isCompressed()
+     {
+         return compressionInfo != null || compressionMetadata != null;
+     }
+ 
      /**
       * @return total file size to transfer in bytes
       */
@@@ -156,15 -188,17 +196,20 @@@
                  out.writeLong(section.left);
                  out.writeLong(section.right);
              }
-             CompressionInfo.serializer.serialize(header.compressionInfo, out, 
version);
+             // construct CompressionInfo here to avoid holding large number 
of Chunks on heap.
+             CompressionInfo compressionInfo = null;
+             if (header.compressionMetadata != null)
+                 compressionInfo = new 
CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections),
 header.compressionMetadata.parameters);
+             CompressionInfo.serializer.serialize(compressionInfo, out, 
version);
              out.writeLong(header.repairedAt);
              out.writeInt(header.sstableLevel);
 +
 +            if (version >= StreamMessage.VERSION_30)
 +                SerializationHeader.serializer.serialize(header.version, 
header.header, out);
+             return compressionInfo;
          }
  
 -        public FileMessageHeader deserialize(DataInput in, int version) 
throws IOException
 +        public FileMessageHeader deserialize(DataInputPlus in, int version) 
throws IOException
          {
              UUID cfId = UUIDSerializer.serializer.deserialize(in, 
MessagingService.current_version);
              int sequenceNumber = in.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 19f9e12,31ab2a8..d881d43
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@@ -40,9 -39,9 +40,9 @@@ public class IncomingFileMessage extend
          @SuppressWarnings("resource")
          public IncomingFileMessage deserialize(ReadableByteChannel in, int 
version, StreamSession session) throws IOException
          {
 -            DataInputStream input = new 
DataInputStream(Channels.newInputStream(in));
 +            DataInputPlus input = new 
DataInputStreamPlus(Channels.newInputStream(in));
              FileMessageHeader header = 
FileMessageHeader.serializer.deserialize(input, version);
-             StreamReader reader = header.compressionInfo == null ? new 
StreamReader(header, session)
+             StreamReader reader = !header.isCompressed() ? new 
StreamReader(header, session)
                      : new CompressedStreamReader(header, session);
  
              try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 82e6620,c8175ea..f10b42e
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -62,22 -62,15 +62,16 @@@ public class OutgoingFileMessage extend
  
          SSTableReader sstable = ref.get();
          filename = sstable.getFilename();
-         CompressionInfo compressionInfo = null;
-         if (sstable.compression)
-         {
-             CompressionMetadata meta = sstable.getCompressionMetadata();
-             compressionInfo = new 
CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
-         }
          this.header = new FileMessageHeader(sstable.metadata.cfId,
                                              sequenceNumber,
 -                                            
sstable.descriptor.version.toString(),
 +                                            sstable.descriptor.version,
                                              sstable.descriptor.formatType,
                                              estimatedKeys,
                                              sections,
-                                             compressionInfo,
+                                             sstable.compression ? 
sstable.getCompressionMetadata() : null,
                                              repairedAt,
 -                                            keepSSTableLevel ? 
sstable.getSSTableLevel() : 0);
 +                                            keepSSTableLevel ? 
sstable.getSSTableLevel() : 0,
 +                                            sstable.header == null ? null : 
sstable.header.toComponent());
      }
  
      public synchronized void serialize(DataOutputStreamPlus out, int version, 
StreamSession session) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index db05a3e,0000000..5646592
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,122 -1,0 +1,129 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
++import static org.junit.Assert.assertEquals;
++
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
 +    /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean 
testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", 
"unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
 +        CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
 +        for (long l = 0L; l < 1000; l++)
 +        {
 +            index.put(l, writer.position());
 +            writer.writeLong(l);
 +        }
 +        writer.finish();
 +
 +        CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
 +        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = 
comp.getChunksForSections(sections);
++        long totalSize = comp.getTotalSizeForSections(sections);
++        long expectedSize = 0;
++        for (CompressionMetadata.Chunk c : chunks)
++            expectedSize += c.length + 4;
++        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
 +        RandomAccessFile f = new RandomAccessFile(tmp, "r");
 +        int pos = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +        {
 +            f.seek(c.offset);
 +            pos += f.read(toRead, pos, c.length + 4);
 +        }
 +        f.close();
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info,
 +                                                                
ChecksumType.CRC32, () -> 1.0);
 +        DataInputStream in = new DataInputStream(input);
 +
 +        for (int i = 0; i < sections.size(); i++)
 +        {
 +            input.position(sections.get(i).left);
 +            long readValue = in.readLong();
 +            assert readValue == valuesToCheck[i] : "expected " + 
valuesToCheck[i] + " but was " + readValue;
 +        }
 +    }
 +}

Reply via email to