Repository: phoenix Updated Branches: refs/heads/3.2 b805aa64c -> 43abdea7d refs/heads/4.2 e00763ee0 -> ab0bcb839 refs/heads/master bc89c9a51 -> 300edd0b5
PHOENIX-1455 Replace org.xerial.snappy with org.iq80.snappy pure Java snappy implementation Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/300edd0b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/300edd0b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/300edd0b Branch: refs/heads/master Commit: 300edd0b523638461e1dd09f191d666136ec715e Parents: bc89c9a Author: Andrew Purtell <apurt...@apache.org> Authored: Mon Nov 17 18:05:33 2014 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Mon Nov 17 18:05:33 2014 -0800 ---------------------------------------------------------------------- phoenix-core/pom.xml | 6 ++--- .../DistinctValueWithCountClientAggregator.java | 17 ++++++++++---- .../DistinctValueWithCountServerAggregator.java | 24 ++++++-------------- .../apache/phoenix/join/HashCacheClient.java | 3 ++- .../apache/phoenix/join/HashCacheFactory.java | 15 ++++++++---- pom.xml | 14 +++++++++--- 6 files changed, 45 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/300edd0b/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 90a7142..b98e9b2 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -276,8 +276,8 @@ <version>${slf4j.version}</version> </dependency> <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> + <groupId>org.iq80.snappy</groupId> + <artifactId>snappy</artifactId> <version>${snappy.version}</version> </dependency> <dependency> @@ -399,4 +399,4 @@ <artifactId>hadoop-minicluster</artifactId> </dependency> </dependencies> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/phoenix/blob/300edd0b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java index f29f46a..56ca000 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java @@ -35,6 +35,7 @@ import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.iq80.snappy.Snappy; /** * Client side Aggregator which will aggregate data and find distinct values with number of occurrences for each. @@ -59,14 +60,20 @@ public abstract class DistinctValueWithCountClientAggregator extends BaseAggrega PDataType resultDataType = getResultDataType(); cachedResult = resultDataType.toObject(ptr, resultDataType, sortOrder); } else { - InputStream is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1); + InputStream is; try { if (Bytes.equals(ptr.get(), ptr.getOffset(), 1, DistinctValueWithCountServerAggregator.COMPRESS_MARKER, 0, 1)) { - InputStream decompressionStream = DistinctValueWithCountServerAggregator.COMPRESS_ALGO - .createDecompressionStream(is, - DistinctValueWithCountServerAggregator.COMPRESS_ALGO.getDecompressor(), 0); - is = decompressionStream; + // This reads the uncompressed length from the front of the compressed input + int uncompressedLength = Snappy.getUncompressedLength(ptr.get(), ptr.getOffset() + 1); + byte[] uncompressed = new byte[uncompressedLength]; + // This will throw CorruptionException, a RuntimeException if the snappy data is invalid. + // We're making a RuntimeException out of a checked IOException below so assume it's ok + // to let any CorruptionException escape. + Snappy.uncompress(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1, uncompressed, 0); + is = new ByteArrayInputStream(uncompressed, 0, uncompressedLength); + } else { + is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1); } DataInputStream in = new DataInputStream(is); int mapSize = WritableUtils.readVInt(in); http://git-wip-us.apache.org/repos/asf/phoenix/blob/300edd0b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java index 281879e..a3141b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java @@ -17,16 +17,12 @@ */ package org.apache.phoenix.expression.aggregator; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +36,8 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.SizedUtil; +import org.iq80.snappy.Snappy; + /** * Server side Aggregator which will aggregate data and find distinct values with number of occurrences for each. * @@ -50,7 +48,6 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator { private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class); public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000; public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 }; - public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY; private int compressThreshold; private byte[] buffer = null; @@ -101,18 +98,11 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator { } if (serializationSize > compressThreshold) { // The size for the map serialization is above the threshold. We will do the Snappy compression here. - ByteArrayOutputStream compressedByteStream = new ByteArrayOutputStream(); - try { - compressedByteStream.write(COMPRESS_MARKER); - OutputStream compressionStream = COMPRESS_ALGO.createCompressionStream(compressedByteStream, - COMPRESS_ALGO.getCompressor(), 0); - compressionStream.write(buffer, 1, buffer.length - 1); - compressionStream.flush(); - ptr.set(compressedByteStream.toByteArray(), 0, compressedByteStream.size()); - return true; - } catch (Exception e) { - LOG.error("Exception while Snappy compression of data.", e); - } + byte[] compressed = new byte[COMPRESS_MARKER.length + Snappy.maxCompressedLength(buffer.length)]; + System.arraycopy(COMPRESS_MARKER, 0, compressed, 0, COMPRESS_MARKER.length); + int compressedLen = Snappy.compress(buffer, 1, buffer.length - 1, compressed, COMPRESS_MARKER.length); + ptr.set(compressed, 0, compressedLen + 1); + return true; } ptr.set(buffer, 0, offset); return true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/300edd0b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index e2f57df..6494603 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -39,7 +39,8 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import org.apache.phoenix.util.TupleUtil; -import org.xerial.snappy.Snappy; + +import org.iq80.snappy.Snappy; /** * http://git-wip-us.apache.org/repos/asf/phoenix/blob/300edd0b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 0b60a84..8cae51a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -26,7 +26,7 @@ import net.jcip.annotations.Immutable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; -import org.xerial.snappy.Snappy; + import org.apache.phoenix.cache.HashCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.exception.SQLExceptionCode; @@ -39,6 +39,9 @@ import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.*; +import org.iq80.snappy.CorruptionException; +import org.iq80.snappy.Snappy; + public class HashCacheFactory implements ServerCacheFactory { public HashCacheFactory() { @@ -55,11 +58,13 @@ public class HashCacheFactory implements ServerCacheFactory { @Override public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException { try { - int size = Snappy.uncompressedLength(cachePtr.get()); - byte[] uncompressed = new byte[size]; - Snappy.uncompress(cachePtr.get(), 0, cachePtr.getLength(), uncompressed, 0); + // This reads the uncompressed length from the front of the compressed input + int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset()); + byte[] uncompressed = new byte[uncompressedLen]; + Snappy.uncompress(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength(), + uncompressed, 0); return new HashCacheImpl(uncompressed, chunk); - } catch (IOException e) { + } catch (CorruptionException e) { throw ServerUtil.parseServerException(e); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/300edd0b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1750a69..746ad09 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,14 @@ <enabled>true</enabled> </snapshots> </repository> + <repository> + <id>sonatype-nexus-snapshots</id> + <name>Sonatype Nexus Snapshots</name> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> </repositories> <parent> @@ -89,7 +97,7 @@ <flume.version>1.4.0</flume.version> <findbugs.version>1.3.2</findbugs.version> <jline.version>2.11</jline.version> - <snappy.version>1.1.0.1</snappy.version> + <snappy.version>0.3</snappy.version> <netty.version>3.6.6.Final</netty.version> <commons-codec.version>1.7</commons-codec.version> <htrace.version>2.04</htrace.version> @@ -532,8 +540,8 @@ <version>${findbugs.version}</version> </dependency> <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> + <groupId>org.iq80.snappy</groupId> + <artifactId>snappy</artifactId> <version>${snappy.version}</version> </dependency> <dependency>