Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe

Branch: refs/heads/cassandra-3.0
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
      private static final byte[] POISON_PILL = new byte[0];
  
      private long totalCompressedBytesRead;
 -    private final boolean hasPostCompressionAdlerChecksums;
  
+     private Thread readerThread;
+ 
      /**
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info, 
boolean hasPostCompressionAdlerChecksums)
 +    public CompressedInputStream(InputStream source, CompressionInfo info)
      {
          this.info = info;
 -        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : 
new CRC32();
 -        this.hasPostCompressionAdlerChecksums = 
hasPostCompressionAdlerChecksums;
 +        this.checksum =  new Adler32();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new 
ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
      {
          testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
      }
+ 
+     /**
+      * Test CompressedInputStream not hang when closed while reading
 -     * @throws Exception
++     * @throws IOException
+      */
+     @Test(expected = EOFException.class)
 -    public void testClose() throws Exception
++    public void testClose() throws IOException
+     {
+         CompressionParameters param = new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, 
String>emptyMap());
+         CompressionMetadata.Chunk[] chunks = {new 
CompressionMetadata.Chunk(0, 100)};
+         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+         InputStream blockingInput = new InputStream()
+         {
+             @Override
+             public int read() throws IOException
+             {
+                 try
+                 {
+                     // 10 second cut off not to stop other test in case
+                     return Objects.requireNonNull(blocker.poll(10, 
TimeUnit.SECONDS));
+                 }
+                 catch (InterruptedException e)
+                 {
+                     throw new IOException("Interrupted as expected", e);
+                 }
+             }
+         };
+         CompressionInfo info = new CompressionInfo(chunks, param);
 -        try (CompressedInputStream cis = new 
CompressedInputStream(blockingInput, info, true))
++        try (CompressedInputStream cis = new 
CompressedInputStream(blockingInput, info))
+         {
+             new Thread(new Runnable()
+             {
+                 @Override
+                 public void run()
+                 {
+                     try
+                     {
+                         cis.close();
+                     }
+                     catch (Exception ignore) {}
+                 }
+             }).start();
+             // block here
+             cis.read();
+         }
+     }
+ 
      /**
       * @param valuesToCheck array of longs of range(0-999)
       * @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
          File tmp = new File(File.createTempFile("cassandra", 
"unittest").getParent(), "ks-cf-ib-1-Data.db");
          Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
          MetadataCollector collector = new MetadataCollector(new 
SimpleDenseCellNameType(BytesType.instance));
-         CompressionParameters param = new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-         CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector);
-         Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
+         CompressionParameters param = new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, 
String>emptyMap());
+         Map<Long, Long> index = new HashMap<>();
+         try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector))
          {
-             index.put(l, writer.getFilePointer());
-             writer.stream.writeLong(l);
+             for (long l = 0L; l < 1000; l++)
+             {
+                 index.put(l, writer.getFilePointer());
+                 writer.stream.writeLong(l);
+             }
 -            writer.close();
++            writer.finish();
          }
-         writer.finish();
  
          CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+         List<Pair<Long, Long>> sections = new ArrayList<>();
          for (long l : valuesToCheck)
          {
              long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
  
          // read buffer using CompressedInputStream
          CompressionInfo info = new CompressionInfo(chunks, param);
 -        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info, true);
 +        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info);
-         DataInputStream in = new DataInputStream(input);
  
-         for (int i = 0; i < sections.size(); i++)
+         try (DataInputStream in = new DataInputStream(input))
          {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + 
valuesToCheck[i] + " but was " + readValue;
+             for (int i = 0; i < sections.size(); i++)
+             {
+                 input.position(sections.get(i).left);
+                 long readValue = in.readLong();
+                 assertEquals("expected " + valuesToCheck[i] + " but was " + 
readValue, valuesToCheck[i], readValue);
+             }
          }
      }
  }

Reply via email to