Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ef8617
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ef8617
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ef8617

Branch: refs/heads/cassandra-3.8
Commit: 62ef8617cdaa07fa37b1b2121ad5923da64e74a3
Parents: 676b6a8 76e3100
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Aug 9 16:45:52 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Aug 9 16:45:52 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/config/Config.java     |  4 ++
 .../cassandra/config/DatabaseDescriptor.java    |  5 --
 .../cassandra/streaming/StreamReader.java       | 26 +---------
 .../cassandra/streaming/StreamSession.java      | 36 +-------------
 .../compress/CompressedInputStream.java         | 21 +++++++-
 .../compress/CompressedStreamReader.java        | 11 ++---
 .../streaming/messages/IncomingFileMessage.java | 22 ++-------
 .../streaming/messages/RetryMessage.java        |  4 ++
 .../org/apache/cassandra/utils/Throwables.java  | 14 ++++++
 .../compression/CompressedInputStreamTest.java  | 52 +++++++++++++++++---
 11 files changed, 100 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 78bd32d,232203e..f613c5f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,6 +1,35 @@@
 -2.2.8
 +3.0.9
 + * Change commitlog and sstables to track dirty and clean intervals 
(CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns 
(CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in 
GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and 
increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness 
(CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null 
value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once 
(CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format 
(CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns 
(CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be 
used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators 
(CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense 
(CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables 
(CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds 
(CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Fix hanging stream session (CASSANDRA-10992)
 - * Add byteman support for testing (CASSANDRA-12377)
   * Fix INSERT JSON, fromJson() support of smallint, tinyint types 
(CASSANDRA-12371)
   * Restore JVM metric export for metric reporters (CASSANDRA-12312)
   * Release sstables of failed stream sessions only when outgoing transfers 
are finished (CASSANDRA-11345)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index e6c56cb,60daee6..86f1016
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -171,8 -170,11 +171,12 @@@ public class Confi
      public Integer concurrent_compactors;
      public volatile Integer compaction_throughput_mb_per_sec = 16;
      public volatile Integer compaction_large_partition_warning_threshold_mb = 
100;
 +    public Integer min_free_space_per_drive_in_mb = 50;
  
+     /**
+      * @deprecated retry support removed on CASSANDRA-10992
+      */
+     @Deprecated
      public Integer max_streaming_retries = 3;
  
      public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index f8db26b,c96ea22..4ca7937
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -31,24 -33,19 +31,25 @@@ import org.slf4j.LoggerFactory
  
  import com.ning.compress.lzf.LZFInputStream;
  
 +import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.Directories;
 -import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.streaming.messages.FileMessageHeader;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.BytesReadTracker;
 +import org.apache.cassandra.io.util.TrackedInputStream;
 +import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
+ import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
  
  /**
   * StreamReader reads from stream and writes to SSTable.
@@@ -124,63 -121,37 +125,40 @@@ public class StreamReade
              logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}",
                           session.planId(), fileSeqNum, session.peer, 
in.getBytesRead(), totalSize);
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
 -            if (key != null)
 +            if (deserializer != null)
                  logger.warn("[Stream {}] Error while reading partition {} 
from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), 
cfs.getColumnFamilyName());
 +                            session.planId(), deserializer.partitionKey(), 
cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can 
drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
-             drain(in, in.getBytesRead());
-             if (e instanceof IOException)
-                 throw (IOException) e;
-             else
-                 throw Throwables.propagate(e);
+             throw Throwables.propagate(e);
          }
 +        finally
 +        {
 +            if (deserializer != null)
 +                deserializer.cleanup();
 +        }
 +    }
 +
 +    protected SerializationHeader getHeader(CFMetaData metadata)
 +    {
 +        return header != null? header.toHeader(metadata) : null; //pre-3.0 
sstable have no SerializationHeader
      }
  
 -    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
 +    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
      {
 -        Directories.DataDirectory localDir = 
cfs.directories.getWriteableLocation(totalSize);
 +        Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
 -        desc = 
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir),
 format));
 +        desc = 
Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
 format));
  
 -        return SSTableWriter.create(desc, estimatedKeys, repairedAt, 
sstableLevel);
 +        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, 
sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
      }
  
-     protected void drain(InputStream dis, long bytesRead) throws IOException
-     {
-         long toSkip = totalSize() - bytesRead;
- 
-         // InputStream.skip can return -1 if dis is inaccessible.
-         long skipped = dis.skip(toSkip);
-         if (skipped == -1)
-             return;
- 
-         toSkip = toSkip - skipped;
-         while (toSkip > 0)
-         {
-             skipped = dis.skip(toSkip);
-             if (skipped == -1)
-                 break;
-             toSkip = toSkip - skipped;
-         }
-     }
- 
      protected long totalSize()
      {
          long size = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 9719587,fa1022d..bc87c8f
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -17,7 -17,8 +17,6 @@@
   */
  package org.apache.cassandra.streaming.compress;
  
--import java.io.DataInputStream;
 -
  import java.io.IOException;
  import java.nio.channels.Channels;
  import java.nio.channels.ReadableByteChannel;
@@@ -37,9 -40,12 +36,11 @@@ import org.apache.cassandra.streaming.P
  import org.apache.cassandra.streaming.StreamReader;
  import org.apache.cassandra.streaming.StreamSession;
  import org.apache.cassandra.streaming.messages.FileMessageHeader;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.BytesReadTracker;
 +import org.apache.cassandra.io.util.TrackedInputStream;
  import org.apache.cassandra.utils.Pair;
  
+ import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+ 
  /**
   * StreamReader that reads from streamed compressed SSTable
   */
@@@ -114,24 -119,25 +115,22 @@@ public class CompressedStreamReader ext
          }
          catch (Throwable e)
          {
 -            if (key != null)
 +            if (deserializer != null)
                  logger.warn("[Stream {}] Error while reading partition {} 
from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), 
cfs.getColumnFamilyName());
 +                            session.planId(), deserializer.partitionKey(), 
cfs.keyspace.getName(), cfs.getTableName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can 
drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
-             drain(in, in.getBytesRead());
-             if (e instanceof IOException)
-                 throw (IOException) e;
-             else
-                 throw Throwables.propagate(e);
+             if (extractIOExceptionCause(e).isPresent())
+                 throw e;
+             throw Throwables.propagate(e);
          }
 +        finally
 +        {
 +            if (deserializer != null)
 +                deserializer.cleanup();
 +        }
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index d881d43,2870c03..438cb0b
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@@ -20,10 -20,11 +20,12 @@@ package org.apache.cassandra.streaming.
  import java.io.IOException;
  import java.nio.channels.Channels;
  import java.nio.channels.ReadableByteChannel;
++import java.util.Optional;
  
 -import com.google.common.base.Optional;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+ 
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.util.DataOutputStreamPlus;
  import org.apache.cassandra.streaming.StreamReader;
  import org.apache.cassandra.streaming.StreamSession;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/Throwables.java
index 8ef6a63,877f388..5ad9686
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@@ -18,25 -18,14 +18,26 @@@
  */
  package org.apache.cassandra.utils;
  
 +import java.io.File;
  import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.Iterator;
++import java.util.Optional;
 +import java.util.stream.Stream;
  
 -import com.google.common.base.Optional;
 +import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.io.FSWriteError;
  
 -public class Throwables
 +public final class Throwables
  {
 +    public enum FileOpType { READ, WRITE }
  
 -    public static Throwable merge(Throwable existingFail, Throwable newFail)
 +    public interface DiscreteAction<E extends Exception>
 +    {
 +        void perform() throws E;
 +    }
 +
 +    public static <T extends Throwable> T merge(T existingFail, T newFail)
      {
          if (existingFail == null)
              return newFail;
@@@ -167,4 -54,17 +168,17 @@@
          }
          return accumulate;
      }
+ 
+     public static Optional<IOException> extractIOExceptionCause(Throwable t)
+     {
+         if (t instanceof IOException)
+             return Optional.of((IOException) t);
+         Throwable cause = t;
+         while ((cause = cause.getCause()) != null)
+         {
+             if (cause instanceof IOException)
+                 return Optional.of((IOException) cause);
+         }
 -        return Optional.absent();
++        return Optional.empty();
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index a3300ac,0000000..8512d8f
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,137 -1,0 +1,173 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.fail;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
-         testCompressedReadWith(new long[]{0L}, false);
-         testCompressedReadWith(new long[]{1L}, false);
-         testCompressedReadWith(new long[]{100L}, false);
++        testCompressedReadWith(new long[]{0L}, false, false);
++        testCompressedReadWith(new long[]{1L}, false, false);
++        testCompressedReadWith(new long[]{100L}, false, false);
 +
-         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
++        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, 
false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
-         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
++        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true, 
false);
++    }
++
++    /**
++     * Test that CompressedInputStream does not block if there's an exception 
while reading stream
++     */
++    @Test(timeout = 30000)
++    public void testException() throws Exception
++    {
++        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, 
true);
 +    }
 +
 +    /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
-     private void testCompressedReadWith(long[] valuesToCheck, boolean 
testTruncate) throws Exception
++    private void testCompressedReadWith(long[] valuesToCheck, boolean 
testTruncate, boolean testException) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", 
"unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
 +        try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector))
 +        {
 +            for (long l = 0L; l < 1000; l++)
 +            {
 +                index.put(l, writer.position());
 +                writer.writeLong(l);
 +            }
 +            writer.finish();
 +        }
 +
 +        CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
 +        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = 
comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
 +        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
 +            int pos = 0;
 +            for (CompressionMetadata.Chunk c : chunks)
 +            {
 +                f.seek(c.offset);
 +                pos += f.read(toRead, pos, c.length + 4);
 +            }
 +        }
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
-         CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info,
-                                                                 
ChecksumType.CRC32, () -> 1.0);
++
++        if (testException)
++        {
++            testException(sections, info);
++            return;
++        }
++        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info, ChecksumType.CRC32, () -> 1.0);
 +
 +        try (DataInputStream in = new DataInputStream(input))
 +        {
 +            for (int i = 0; i < sections.size(); i++)
 +            {
 +                input.position(sections.get(i).left);
 +                long readValue = in.readLong();
 +                assertEquals("expected " + valuesToCheck[i] + " but was " + 
readValue, valuesToCheck[i], readValue);
 +            }
 +        }
 +    }
++
++    private static void testException(List<Pair<Long, Long>> sections, 
CompressionInfo info) throws IOException
++    {
++        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(new byte[0]), info, ChecksumType.CRC32, () -> 1.0);
++
++        try (DataInputStream in = new DataInputStream(input))
++        {
++            for (int i = 0; i < sections.size(); i++)
++            {
++                input.position(sections.get(i).left);
++                try {
++                    in.readLong();
++                    fail("Should have thrown IOException");
++                }
++                catch (IOException e)
++                {
++                    continue;
++                }
++            }
++        }
++    }
 +}

Reply via email to