This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7291c92f4f Adding zstandard compression library (#12408)
7291c92f4f is described below

commit 7291c92f4f013f0495dbe405d4d9a32017c9dfdd
Author: Dr. Sizzles <[email protected]>
AuthorDate: Sat May 28 17:01:44 2022 -0700

    Adding zstandard compression library (#12408)
    
    * Adding zstandard compression library
    
    * 1. Took @clintropolis's advice to have ZStandard decompressor use the 
byte array when the buffers are not direct.
    2. Cleaned up checkstyle issues.
    
    * Fixing zstandard version to latest stable version in pom's and updating 
license files
    
    * Removing zstd from benchmarks and adding to processing (poms)
    
    * fix the intellij inspection issue
    
    * Removing the prefix v for the version in the license check for ztsd
    
    * Fixing license checks
    
    Co-authored-by: Rahul Gidwani <[email protected]>
---
 .../compression/BaseColumnarLongsBenchmark.java    |  24 ++++
 distribution/bin/check-licenses.py                 |   3 +-
 .../extensions-core/kafka-supervisor-reference.md  |   4 +-
 licenses.yaml                                      |   4 +-
 pom.xml                                            |   2 +-
 processing/pom.xml                                 |   4 +
 .../druid/segment/data/CompressionStrategy.java    |  97 ++++++++++++-
 .../segment/data/CompressionStrategyTest.java      |  55 +++-----
 .../druid/segment/data/TestColumnCompression.java  | 156 +++++++++++++++++++++
 9 files changed, 306 insertions(+), 43 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
index 69d4aa1029..b26e1dc589 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
@@ -61,7 +61,9 @@ public class BaseColumnarLongsBenchmark
    * encoding of values within the block.
    */
   @Param({
+      "zstd-longs",
       "lz4-longs",
+      "zstd-auto",
       "lz4-auto"
   })
   String encoding;
@@ -271,6 +273,26 @@ public class BaseColumnarLongsBenchmark
             CompressionStrategy.NONE
         );
         break;
+      case "zstd-longs":
+        serializer = CompressionFactory.getLongSerializer(
+                encoding,
+                writeOutMedium,
+                "zstd-longs",
+                ByteOrder.LITTLE_ENDIAN,
+                CompressionFactory.LongEncodingStrategy.LONGS,
+                CompressionStrategy.ZSTD
+        );
+        break;
+      case "zstd-auto":
+        serializer = CompressionFactory.getLongSerializer(
+                encoding,
+                writeOutMedium,
+                "zstd-auto",
+                ByteOrder.LITTLE_ENDIAN,
+                CompressionFactory.LongEncodingStrategy.AUTO,
+                CompressionStrategy.ZSTD
+        );
+        break;
       default:
         throw new RuntimeException("unknown encoding");
     }
@@ -290,6 +312,8 @@ public class BaseColumnarLongsBenchmark
       case "lz4-auto":
       case "none-auto":
       case "none-longs":
+      case "zstd-auto":
+      case "zstd-longs":
         return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, 
ByteOrder.LITTLE_ENDIAN).get();
     }
 
diff --git a/distribution/bin/check-licenses.py 
b/distribution/bin/check-licenses.py
index 8af9e59226..1de9b1e22e 100755
--- a/distribution/bin/check-licenses.py
+++ b/distribution/bin/check-licenses.py
@@ -233,7 +233,8 @@ def build_compatible_license_names():
 
     compatible_licenses['BSD-2-Clause License'] = 'BSD-2-Clause License'
     compatible_licenses['BSD-2-Clause'] = 'BSD-2-Clause License'
-    compatible_licenses['BSD 2-Clause license'] = 'BSD 2-Clause License'
+    compatible_licenses['BSD 2-Clause license'] = 'BSD-2-Clause License'
+    compatible_licenses['BSD 2-Clause License'] = 'BSD-2-Clause License'
 
     compatible_licenses['BSD-3-Clause License'] = 'BSD-3-Clause License'
     compatible_licenses['New BSD license'] = 'BSD-3-Clause License'
diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md 
b/docs/development/extensions-core/kafka-supervisor-reference.md
index a59177aa31..1e8d1b40ca 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -208,8 +208,8 @@ The `tuningConfig` is optional and default parameters will 
be used if no `tuning
 |Field|Type|Description|Required|
 |-----|----|-----------|--------|
 |bitmap|Object|Compression format for bitmap indexes. Should be a JSON object. 
See [Bitmap types](#bitmap-types) below for options.|no (defaults to Roaring)|
-|dimensionCompression|String|Compression format for dimension columns. Choose 
from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
-|metricCompression|String|Compression format for primitive type metric 
columns. Choose from `LZ4`, `LZF`, `uncompressed`, or `none`.|no (default == 
`LZ4`)|
+|dimensionCompression|String|Compression format for dimension columns. Choose 
from `LZ4`, `LZF`, `ZSTD` or `uncompressed`.|no (default == `LZ4`)|
+|metricCompression|String|Compression format for primitive type metric 
columns. Choose from `LZ4`, `LZF`, `ZSTD`, `uncompressed` or `none`.|no 
(default == `LZ4`)|
 |longEncoding|String|Encoding format for metric and dimension columns with 
type long. Choose from `auto` or `longs`. `auto` encodes the values using 
offset or lookup table depending on column cardinality, and store them with 
variable size. `longs` stores the value as is with 8 bytes each.|no (default == 
`longs`)|
 
 ##### Bitmap types
diff --git a/licenses.yaml b/licenses.yaml
index 68ef74ca8a..372d452fda 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3964,7 +3964,7 @@ name: JNI binding for Zstd
 license_category: binary
 module: java-core
 license_name: BSD-2-Clause License
-version: 1.3.3-1
+version: 1.5.2-3
 copyright: Luben Karavelov
 license_file_path: licenses/bin/zstd-jni.BSD2
 libraries:
@@ -5028,7 +5028,7 @@ libraries:
 name: DNS Java
 license_category: binary
 module: java-core
-license_name: BSD 2-Clause license
+license_name: BSD-2-Clause License
 version: 2.1.7
 libraries:
   - dnsjava: dnsjava
diff --git a/pom.xml b/pom.xml
index 621e0f8d74..1053739519 100644
--- a/pom.xml
+++ b/pom.xml
@@ -449,7 +449,7 @@
             <dependency>
                 <groupId>com.github.luben</groupId>
                 <artifactId>zstd-jni</artifactId>
-                <version>1.3.3-1</version>
+                <version>1.5.2-3</version>
             </dependency>
             <dependency>
                 <groupId>com.fasterxml.jackson</groupId>
diff --git a/processing/pom.xml b/processing/pom.xml
index fdcdaaffff..4bd7a2c3c7 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -169,6 +169,10 @@
             <groupId>io.netty</groupId>
             <artifactId>netty-common</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
 
         <!-- Tests -->
         <dependency>
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java
index 1d229b51c8..478983f941 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonValue;
+import com.github.luben.zstd.Zstd;
 import com.ning.compress.BufferRecycler;
 import com.ning.compress.lzf.LZFDecoder;
 import com.ning.compress.lzf.LZFEncoder;
@@ -75,6 +76,21 @@ public enum CompressionStrategy
       return LZ4Compressor.DEFAULT_COMPRESSOR;
     }
   },
+
+  ZSTD((byte) 0x2) {
+    @Override
+    public Decompressor getDecompressor()
+    {
+      return ZstdDecompressor.DEFAULT_COMPRESSOR;
+    }
+
+    @Override
+    public Compressor getCompressor()
+    {
+      return ZstdCompressor.DEFAULT_COMPRESSOR;
+    }
+  },
+
   UNCOMPRESSED((byte) 0xFF) {
     @Override
     public Decompressor getDecompressor()
@@ -171,7 +187,7 @@ public enum CompressionStrategy
     /**
      * Allocates a buffer that should be passed to {@link #compress} method as 
input buffer. Different Compressors
      * require (or work more efficiently with) different kinds of buffers.
-     *
+     * <p>
      * If the allocated buffer is a direct buffer, it should be registered to 
be freed with the given Closer.
      */
     ByteBuffer allocateInBuffer(int inputSize, Closer closer)
@@ -182,9 +198,9 @@ public enum CompressionStrategy
     /**
      * Allocates a buffer that should be passed to {@link #compress} method as 
output buffer. Different Compressors
      * require (or work more efficiently with) different kinds of buffers.
-     *
+     * <p>
      * Allocates a buffer that is always enough to compress a byte sequence of 
the given size.
-     *
+     * <p>
      * If the allocated buffer is a direct buffer, it should be registered to 
be freed with the given Closer.
      */
     abstract ByteBuffer allocateOutBuffer(int inputSize, Closer closer);
@@ -344,6 +360,81 @@ public enum CompressionStrategy
     }
   }
 
+  public static class ZstdCompressor extends Compressor
+  {
+    private static final ZstdCompressor DEFAULT_COMPRESSOR = new 
ZstdCompressor();
+
+    @Override
+    ByteBuffer allocateInBuffer(int inputSize, Closer closer)
+    {
+      ByteBuffer inBuffer = ByteBuffer.allocateDirect(inputSize);
+      closer.register(() -> ByteBufferUtils.free(inBuffer));
+      return inBuffer;
+    }
+
+    @Override
+    ByteBuffer allocateOutBuffer(int inputSize, Closer closer)
+    {
+      ByteBuffer outBuffer = ByteBuffer.allocateDirect((int) 
Zstd.compressBound(inputSize));
+      closer.register(() -> ByteBufferUtils.free(outBuffer));
+      return outBuffer;
+    }
+
+    @Override
+    public ByteBuffer compress(ByteBuffer in, ByteBuffer out)
+    {
+      int position = in.position();
+      out.clear();
+      long sizeNeeded = Zstd.compressBound(in.remaining());
+      if (out.remaining() < sizeNeeded) {
+        throw new RuntimeException("Output buffer too small, please allocate 
more space. " + sizeNeeded + " required.");
+      }
+      Zstd.compress(out, in, Zstd.maxCompressionLevel());
+      in.position(position);
+      out.flip();
+      return out;
+    }
+  }
+
+  public static class ZstdDecompressor implements Decompressor
+  {
+    private static final ZstdDecompressor DEFAULT_COMPRESSOR = new 
ZstdDecompressor();
+
+    @Override
+    public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
+    {
+      out.clear();
+      if (!in.isDirect() || !out.isDirect()) {
+        // fall back to heap byte arrays if both buffers are not direct
+        final byte[] inputBytes = new byte[numBytes];
+        in.get(inputBytes);
+        try (final ResourceHolder<byte[]> outputBytesHolder = 
CompressedPools.getOutputBytes()) {
+          final byte[] outputBytes = outputBytesHolder.get();
+          int decompressedBytes = (int) Zstd.decompressByteArray(
+              outputBytes,
+              0,
+              outputBytes.length,
+              inputBytes,
+              0,
+              numBytes
+          );
+          out.put(outputBytes, 0, decompressedBytes);
+          out.flip();
+        }
+      } else {
+        int decompressedBytes = (int) Zstd.decompressDirectByteBuffer(
+            out,
+            out.position(),
+            out.remaining(),
+            in,
+            in.position(),
+            numBytes
+        );
+        out.limit(out.position() + decompressedBytes);
+      }
+    }
+  }
+
   /**
    * Logs info relating to whether LZ4 is using native or pure Java 
implementations
    */
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java
index 44a97d01e6..23c0d64e5b 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment.data;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.java.util.common.ByteBufferUtils;
@@ -40,7 +39,6 @@ import java.util.Collection;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -53,14 +51,7 @@ public class CompressionStrategyTest
   {
     return Iterables.transform(
         Arrays.asList(CompressionStrategy.noNoneValues()),
-        new Function<CompressionStrategy, Object[]>()
-        {
-          @Override
-          public Object[] apply(CompressionStrategy compressionStrategy)
-          {
-            return new Object[]{compressionStrategy};
-          }
-        }
+        compressionStrategy -> new Object[]{compressionStrategy}
     );
   }
 
@@ -101,22 +92,12 @@ public class CompressionStrategyTest
   public void testBasicOperations()
   {
     ByteBuffer compressionOut = 
compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, 
closer);
-    ByteBuffer compressed = 
compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), 
compressionOut);
-    ByteBuffer output = ByteBuffer.allocate(originalData.length);
-    compressionStrategy.getDecompressor().decompress(compressed, 
compressed.remaining(), output);
-    byte[] checkArray = new byte[DATA_SIZER];
-    output.get(checkArray);
-    Assert.assertArrayEquals("Uncompressed data does not match", originalData, 
checkArray);
-  }
-
-  @Test
-  public void testDirectMemoryOperations()
-  {
-    ByteBuffer compressionOut = 
compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, 
closer);
-    ByteBuffer compressed = 
compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), 
compressionOut);
-
+    ByteBuffer compressionIn = 
compressionStrategy.getCompressor().allocateInBuffer(originalData.length, 
closer);
     try (final ResourceHolder<ByteBuffer> holder = 
ByteBufferUtils.allocateDirect(originalData.length)) {
       final ByteBuffer output = holder.get();
+      compressionIn.put(originalData);
+      compressionIn.rewind();
+      ByteBuffer compressed = 
compressionStrategy.getCompressor().compress(compressionIn, compressionOut);
       compressionStrategy.getDecompressor().decompress(compressed, 
compressed.remaining(), output);
       byte[] checkArray = new byte[DATA_SIZER];
       output.get(checkArray);
@@ -140,27 +121,33 @@ public class CompressionStrategyTest
     for (int i = 0; i < numThreads; ++i) {
       results.add(
           threadPoolExecutor.submit(
-              new Callable<Boolean>()
-              {
-                @Override
-                public Boolean call()
-                {
-                  ByteBuffer compressionOut = 
compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, 
closer);
-                  ByteBuffer compressed = 
compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), 
compressionOut);
-                  ByteBuffer output = ByteBuffer.allocate(originalData.length);
+              () -> {
+                ByteBuffer compressionOut = compressionStrategy.getCompressor()
+                                                               
.allocateOutBuffer(originalData.length, closer);
+                ByteBuffer compressionIn = compressionStrategy.getCompressor()
+                                                              
.allocateInBuffer(originalData.length, closer);
+                try {
+                  compressionIn.put(originalData);
+                  compressionIn.position(0);
+                  ByteBuffer compressed = 
compressionStrategy.getCompressor().compress(compressionIn, compressionOut);
+                  ByteBuffer output = 
compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, 
closer);
                   compressionStrategy.getDecompressor().decompress(compressed, 
compressed.remaining(), output);
                   byte[] checkArray = new byte[DATA_SIZER];
                   output.get(checkArray);
                   Assert.assertArrayEquals("Uncompressed data does not match", 
originalData, checkArray);
                   return true;
                 }
+                finally {
+                  ByteBufferUtils.free(compressionIn);
+                  ByteBufferUtils.free(compressionOut);
+                }
               }
           )
       );
     }
     threadPoolExecutor.shutdown();
-    for (Future result : results) {
-      Assert.assertTrue((Boolean) result.get());
+    for (Future<Boolean> result : results) {
+      Assert.assertTrue(result.get());
     }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
 
b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
new file mode 100644
index 0000000000..5833bdd538
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+@RunWith(Parameterized.class)
+public class TestColumnCompression
+{
+
+  private final CompressionStrategy compressionType;
+  private ColumnarMultiInts compressed;
+  int bytes = 1;
+  int valuesPerRowBound = 5;
+  int filteredRowCount = 1000;
+  private BitSet filter;
+  private ByteBuffer buffer;
+
+  public TestColumnCompression(CompressionStrategy strategy)
+  {
+    compressionType = strategy;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> compressionStrategies()
+  {
+    return Arrays.stream(CompressionStrategy.values())
+                 .filter(x -> !CompressionStrategy.NONE.equals(x))
+                 .map(strategy -> new 
Object[]{strategy}).collect(Collectors.toList());
+  }
+
+  @Before
+  public void setUp() throws Exception
+  {
+    Random rand = ThreadLocalRandom.current();
+    List<int[]> rows = new ArrayList<>();
+    final int bound = 1 << bytes;
+    for (int i = 0; i < 0x100000; i++) {
+      int count = rand.nextInt(valuesPerRowBound) + 1;
+      int[] row = new int[rand.nextInt(count)];
+      for (int j = 0; j < row.length; j++) {
+        row[j] = rand.nextInt(bound);
+      }
+      rows.add(row);
+    }
+
+    buffer = serialize(
+        CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
+            Iterables.transform(rows, (Function<int[], ColumnarInts>) input -> 
VSizeColumnarInts.fromArray(input, 20)),
+            bound - 1,
+            ByteOrder.nativeOrder(),
+            compressionType,
+            Closer.create()
+        )
+    );
+    this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
+        buffer,
+        ByteOrder.nativeOrder()
+    ).get();
+
+    filter = new BitSet();
+    for (int i = 0; i < filteredRowCount; i++) {
+      int rowToAccess = rand.nextInt(rows.size());
+      // Skip already selected rows if any
+      while (filter.get(rowToAccess)) {
+        rowToAccess = (rowToAccess + 1) % rows.size();
+      }
+      filter.set(rowToAccess);
+    }
+  }
+
+  @After
+  public void tearDown() 
+  {
+    ByteBufferUtils.free(buffer);
+  }
+
+  private static ByteBuffer serialize(WritableSupplier<ColumnarMultiInts> 
writableSupplier) throws IOException
+  {
+    final ByteBuffer buffer = ByteBuffer.allocateDirect((int) 
writableSupplier.getSerializedSize());
+    WritableByteChannel channel = new WritableByteChannel()
+    {
+      @Override
+      public int write(ByteBuffer src)
+      {
+        int size = src.remaining();
+        buffer.put(src);
+        return size;
+      }
+
+      @Override
+      public boolean isOpen()
+      {
+        return true;
+      }
+
+      @Override
+      public void close()
+      {
+
+      }
+    };
+
+    writableSupplier.writeTo(channel, null);
+    buffer.rewind();
+    return buffer;
+  }
+
+  @Test
+  public void testCompressed()
+  {
+    for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
+      IndexedInts row = compressed.get(i);
+      for (int j = 0, rowSize = row != null ? row.size() : 0; j < rowSize; 
j++) {
+        row.get(j);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to