This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 470f8ba2e0 Add GZIP Compression Codec (#11434) (#12668) 470f8ba2e0 is described below commit 470f8ba2e07254df2312c8961c35baedf3db35ad Author: marregui <miguel.arre...@gmail.com> AuthorDate: Thu Mar 28 18:28:05 2024 +0100 Add GZIP Compression Codec (#11434) (#12668) --- .../NoDictionaryCompressionQueriesTest.java | 73 +++++- .../BenchmarkNoDictionaryIntegerCompression.java | 44 +++- .../perf/BenchmarkNoDictionaryLongCompression.java | 41 +++- .../BenchmarkNoDictionaryStringCompression.java | 186 +++++++-------- .../io/compression/ChunkCompressorFactory.java | 9 +- .../local/io/compression/GzipCompressor.java | 66 ++++++ .../local/io/compression/GzipDecompressor.java | 63 +++++ .../writer/impl/BaseChunkForwardIndexWriter.java | 2 +- .../impl/VarByteChunkForwardIndexWriterV4.java | 1 + .../forward/BaseChunkForwardIndexReader.java | 10 +- .../forward/VarByteChunkForwardIndexReaderV4.java | 1 + .../local/io/compression/TestCompression.java | 127 ++++++++-- .../forward/VarByteChunkSVForwardIndexTest.java | 13 + .../index/loader/ForwardIndexHandlerTest.java | 261 +++++++++++---------- .../spi/compression/ChunkCompressionType.java | 2 +- .../segment/spi/compression/ChunkCompressor.java | 8 +- .../segment/spi/compression/ChunkDecompressor.java | 8 +- .../pinot/segment/spi/index/FieldIndexConfigs.java | 6 +- .../segment/spi/index/ForwardIndexConfig.java | 4 + .../apache/pinot/spi/config/table/FieldConfig.java | 13 +- 20 files changed, 659 insertions(+), 279 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java index 9fe631aa8f..7ae5dbe2cf 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java @@ -69,16 +69,18 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { private static final String PASS_THROUGH_STRING = "PASS_THROUGH_STRING"; private static final String ZSTANDARD_STRING = "ZSTANDARD_STRING"; private static final String LZ4_STRING = "LZ4_STRING"; + private static final String GZIP_STRING = "GZIP_STRING"; private static final String SNAPPY_LONG = "SNAPPY_LONG"; private static final String PASS_THROUGH_LONG = "PASS_THROUGH_LONG"; private static final String ZSTANDARD_LONG = "ZSTANDARD_LONG"; private static final String LZ4_LONG = "LZ4_LONG"; - + private static final String GZIP_LONG = "GZIP_LONG"; private static final String SNAPPY_INTEGER = "SNAPPY_INTEGER"; private static final String PASS_THROUGH_INTEGER = "PASS_THROUGH_INTEGER"; private static final String ZSTANDARD_INTEGER = "ZSTANDARD_INTEGER"; private static final String LZ4_INTEGER = "LZ4_INTEGER"; + private static final String GZIP_INTEGER = "GZIP_INTEGER"; private static final List<String> RAW_SNAPPY_INDEX_COLUMNS = Arrays.asList(SNAPPY_STRING, SNAPPY_LONG, SNAPPY_INTEGER); @@ -90,6 +92,7 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { Arrays.asList(PASS_THROUGH_STRING, PASS_THROUGH_LONG, PASS_THROUGH_INTEGER); private static final List<String> RAW_LZ4_INDEX_COLUMNS = Arrays.asList(LZ4_STRING, LZ4_LONG, LZ4_INTEGER); + private static final List<String> RAW_GZIP_INDEX_COLUMNS = Arrays.asList(GZIP_STRING, GZIP_LONG, GZIP_INTEGER); private IndexSegment _indexSegment; private List<IndexSegment> _indexSegments; @@ -123,6 +126,7 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { indexColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS); indexColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS); indexColumns.addAll(RAW_LZ4_INDEX_COLUMNS); + indexColumns.addAll(RAW_GZIP_INDEX_COLUMNS); indexLoadingConfig.addNoDictionaryColumns(indexColumns); ImmutableSegment immutableSegment = @@ -143,7 +147,7 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { List<FieldConfig> fieldConfigs = new ArrayList<>( RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size() - + RAW_LZ4_INDEX_COLUMNS.size()); + + RAW_LZ4_INDEX_COLUMNS.size() + RAW_GZIP_INDEX_COLUMNS.size()); for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) { fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), @@ -165,11 +169,17 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { FieldConfig.CompressionCodec.LZ4, null)); } + for (String indexColumn : RAW_GZIP_INDEX_COLUMNS) { + fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), + FieldConfig.CompressionCodec.GZIP, null)); + } + List<String> noDictionaryColumns = new ArrayList<>(); noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS); noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS); noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS); noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS); + noDictionaryColumns.addAll(RAW_GZIP_INDEX_COLUMNS); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(noDictionaryColumns) @@ -179,14 +189,17 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { .addSingleValueDimension(PASS_THROUGH_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(ZSTANDARD_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(LZ4_STRING, FieldSpec.DataType.STRING) + .addSingleValueDimension(GZIP_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(SNAPPY_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(ZSTANDARD_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(PASS_THROUGH_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(LZ4_INTEGER, FieldSpec.DataType.INT) + .addSingleValueDimension(GZIP_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(SNAPPY_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(ZSTANDARD_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(PASS_THROUGH_LONG, FieldSpec.DataType.LONG) - .addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG).build(); + .addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG) + .addSingleValueDimension(GZIP_LONG, FieldSpec.DataType.LONG).build(); SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); config.setOutDir(INDEX_DIR.getPath()); config.setTableName(TABLE_NAME); @@ -227,14 +240,17 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { row.putValue(ZSTANDARD_STRING, tempStringRows[i]); row.putValue(PASS_THROUGH_STRING, tempStringRows[i]); row.putValue(LZ4_STRING, tempStringRows[i]); + row.putValue(GZIP_STRING, tempStringRows[i]); row.putValue(SNAPPY_INTEGER, tempIntRows[i]); row.putValue(ZSTANDARD_INTEGER, tempIntRows[i]); row.putValue(PASS_THROUGH_INTEGER, tempIntRows[i]); row.putValue(LZ4_INTEGER, tempIntRows[i]); + row.putValue(GZIP_INTEGER, tempIntRows[i]); row.putValue(SNAPPY_LONG, tempLongRows[i]); row.putValue(ZSTANDARD_LONG, tempLongRows[i]); row.putValue(PASS_THROUGH_LONG, tempLongRows[i]); row.putValue(LZ4_LONG, tempLongRows[i]); + row.putValue(GZIP_LONG, tempLongRows[i]); rows.add(row); } return rows; @@ -246,18 +262,19 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { @Test public void testQueriesWithCompressionCodec() { String query = "SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, LZ4_STRING, " - + "SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, LZ4_INTEGER, " - + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, LZ4_LONG FROM MyTable LIMIT 1000"; + + "GZIP_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, LZ4_INTEGER, " + + "GZIP_INTEGER, SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, LZ4_LONG, GZIP_LONG FROM MyTable LIMIT 1000"; ArrayList<Serializable[]> expected = new ArrayList<>(); for (GenericRow row : _rows) { expected.add(new Serializable[]{ - String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), - String.valueOf(row.getValue(PASS_THROUGH_STRING)), String.valueOf(row.getValue(LZ4_STRING)), - (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), - (Integer) row.getValue(PASS_THROUGH_INTEGER), (Integer) row.getValue(LZ4_INTEGER), - (Long) row.getValue(SNAPPY_LONG), (Long) row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), - (Long) row.getValue(LZ4_LONG) + String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf( + row.getValue(PASS_THROUGH_STRING)), String.valueOf(row.getValue(LZ4_STRING)), String.valueOf( + row.getValue(GZIP_STRING)), (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue( + ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER), (Integer) row.getValue( + LZ4_INTEGER), (Integer) row.getValue(GZIP_INTEGER), (Long) row.getValue(SNAPPY_LONG), (Long) row.getValue( + ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), (Long) row.getValue(LZ4_LONG), (Long) row.getValue( + GZIP_LONG) }); } testSelectQueryHelper(query, expected.size(), expected); @@ -297,6 +314,23 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { testSelectQueryHelper(query, expected.size(), expected); } + /** + * Tests for filter over integer values GZIP compression codec queries. + */ + @Test + public void testGZIPIntegerFilterQueriesWithCompressionCodec() { + String query = "SELECT GZIP_INTEGER FROM MyTable WHERE GZIP_INTEGER > 1000 LIMIT 1000"; + ArrayList<Serializable[]> expected = new ArrayList<>(); + + for (GenericRow row : _rows) { + int value = (Integer) row.getValue(GZIP_INTEGER); + if (value > 1000) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + /** * Tests for filter over integer values compression codec queries. */ @@ -365,6 +399,23 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest { testSelectQueryHelper(query, expected.size(), expected); } + /** + * Tests for filter over string values GZIP compression codec queries. + */ + @Test + public void testGZIPStringFilterQueriesWithCompressionCodec() { + String query = "SELECT GZIP_STRING FROM MyTable WHERE GZIP_STRING = 'hello_world_123' LIMIT 1000"; + ArrayList<Serializable[]> expected = new ArrayList<>(); + + for (GenericRow row : _rows) { + String value = String.valueOf(row.getValue(GZIP_STRING)); + if (value.equals("hello_world_123")) { + expected.add(new Serializable[]{value}); + } + } + testSelectQueryHelper(query, expected.size(), expected); + } + /** * Tests for filter over string values snappy compression codec queries. */ diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java index 6c1a0e3ae2..f51ff94f15 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java @@ -24,6 +24,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import net.jpountz.lz4.LZ4Factory; import org.apache.commons.lang3.RandomUtils; +import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.ChunkCompressor; +import org.apache.pinot.segment.spi.compression.ChunkDecompressor; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -68,7 +72,13 @@ public class BenchmarkNoDictionaryIntegerCompression { private static ByteBuffer _lz4CompressedIntegerInput; private static ByteBuffer _lz4IntegerDecompressed; + private static ByteBuffer _gzipCompressedIntegerOutput; + private static ByteBuffer _gzipCompressedIntegerInput; + private static ByteBuffer _gzipIntegerDecompressed; + private static LZ4Factory _factory; + private static ChunkCompressor _gzipCompressor; + private static ChunkDecompressor _gzipDecompressor; @Setup(Level.Invocation) public void setUp() @@ -84,12 +94,14 @@ public class BenchmarkNoDictionaryIntegerCompression { // position for lz4 is required _uncompressedInt.flip(); _factory.fastCompressor().compress(_uncompressedInt, _lz4CompressedIntegerInput); + _gzipCompressor.compress(_uncompressedInt, _gzipCompressedIntegerInput); _zstdIntegerDecompressed.rewind(); _zstandardCompressedIntegerInput.flip(); _uncompressedInt.flip(); _snappyIntegerDecompressed.rewind(); _lz4CompressedIntegerInput.flip(); + _gzipCompressedIntegerInput.flip(); } private void generateRandomIntegerBuffer() { @@ -102,8 +114,10 @@ public class BenchmarkNoDictionaryIntegerCompression { } private void initializeCompressors() { - //Initialize compressors and decompressors for lz4 + //Initialize compressors and decompressors for lz4 and gzip _factory = LZ4Factory.fastestInstance(); + _gzipCompressor = ChunkCompressorFactory.getCompressor(ChunkCompressionType.GZIP); + _gzipDecompressor = ChunkCompressorFactory.getDecompressor(ChunkCompressionType.GZIP); } private void allocateBufferMemory() { @@ -117,6 +131,9 @@ public class BenchmarkNoDictionaryIntegerCompression { _lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2); _snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2); _zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2); + _gzipIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2); + _gzipCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2); + _gzipCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2); } @TearDown(Level.Invocation) @@ -128,10 +145,13 @@ public class BenchmarkNoDictionaryIntegerCompression { _zstdIntegerDecompressed.clear(); _lz4CompressedIntegerOutput.clear(); _lz4IntegerDecompressed.clear(); + _gzipCompressedIntegerOutput.clear(); + _gzipIntegerDecompressed.clear(); _uncompressedInt.rewind(); _zstandardCompressedIntegerInput.rewind(); _lz4CompressedIntegerInput.rewind(); + _gzipCompressedIntegerInput.rewind(); } } @@ -207,9 +227,27 @@ public class BenchmarkNoDictionaryIntegerCompression { return state._lz4IntegerDecompressed.position(); } + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkGZIPIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state) + throws IOException { + state._gzipCompressor.compress(state._uncompressedInt, state._gzipCompressedIntegerOutput); + return state._gzipCompressedIntegerOutput.position(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkGZIPIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state) + throws IOException { + state._gzipDecompressor.decompress(state._gzipCompressedIntegerInput, state._gzipIntegerDecompressed); + return state._gzipIntegerDecompressed.position(); + } + public static void main(String[] args) throws Exception { - new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()) - .run(); + new Runner( + new OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()).run(); } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java index b81d26a19d..1819278b28 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java @@ -24,6 +24,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import net.jpountz.lz4.LZ4Factory; import org.apache.commons.lang3.RandomUtils; +import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.ChunkCompressor; +import org.apache.pinot.segment.spi.compression.ChunkDecompressor; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -67,8 +71,13 @@ public class BenchmarkNoDictionaryLongCompression { private static ByteBuffer _lz4CompressedLongOutput; private static ByteBuffer _lz4CompressedLongInput; private static ByteBuffer _lz4LongDecompressed; + private static ByteBuffer _gzipCompressedLongOutput; + private static ByteBuffer _gzipCompressedLongInput; + private static ByteBuffer _gzipLongDecompressed; private static LZ4Factory _factory; + private static ChunkCompressor _gzipCompressor; + private static ChunkDecompressor _gzipDecompressor; @Setup(Level.Invocation) public void setUp() @@ -84,12 +93,14 @@ public class BenchmarkNoDictionaryLongCompression { // position for lz4 is required _uncompressedLong.flip(); _factory.fastCompressor().compress(_uncompressedLong, _lz4CompressedLongInput); + _gzipCompressor.compress(_uncompressedLong, _gzipCompressedLongInput); _zstandardLongDecompressedOutput.rewind(); _zstandardCompressedLongInput.flip(); _uncompressedLong.flip(); _snappyLongDecompressedOutput.flip(); _lz4CompressedLongInput.flip(); + _gzipCompressedLongInput.flip(); } private void generateRandomLongBuffer() { @@ -102,8 +113,10 @@ public class BenchmarkNoDictionaryLongCompression { } private void initializeCompressors() { - //Initialize compressors and decompressors for lz4 + //Initialize compressors and decompressors for lz4 and gzip _factory = LZ4Factory.fastestInstance(); + _gzipCompressor = ChunkCompressorFactory.getCompressor(ChunkCompressionType.GZIP); + _gzipDecompressor = ChunkCompressorFactory.getDecompressor(ChunkCompressionType.GZIP); } private void allocateBufferMemory() { @@ -116,6 +129,9 @@ public class BenchmarkNoDictionaryLongCompression { _lz4LongDecompressed = ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2); _lz4CompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2); _lz4CompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2); + _gzipLongDecompressed = ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2); + _gzipCompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2); + _gzipCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2); } @TearDown(Level.Invocation) @@ -127,10 +143,13 @@ public class BenchmarkNoDictionaryLongCompression { _zstandardLongDecompressedOutput.clear(); _lz4CompressedLongOutput.clear(); _lz4LongDecompressed.clear(); + _gzipCompressedLongOutput.clear(); + _gzipLongDecompressed.clear(); _uncompressedLong.rewind(); _zstandardCompressedLongInput.rewind(); _lz4CompressedLongInput.rewind(); + _gzipCompressedLongInput.rewind(); } } @@ -210,6 +229,26 @@ public class BenchmarkNoDictionaryLongCompression { return state._lz4LongDecompressed.position(); } + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkGZIPLongCompression( + BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state) + throws IOException { + state._gzipCompressor.compress(state._uncompressedLong, state._gzipCompressedLongOutput); + return state._gzipCompressedLongOutput.position(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public int benchmarkGZIPLongDecompression( + BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state) + throws IOException { + state._gzipDecompressor.decompress(state._gzipCompressedLongInput, state._gzipLongDecompressed); + return state._gzipLongDecompressed.position(); + } + public static void main(String[] args) throws Exception { new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryLongCompression.class.getSimpleName()).build()).run(); diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java index 0cf5a3df7e..50f7687c9a 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java @@ -23,8 +23,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.TimeUnit; -import net.jpountz.lz4.LZ4Factory; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.ChunkCompressor; +import org.apache.pinot.segment.spi.compression.ChunkDecompressor; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -50,181 +53,172 @@ import static java.nio.charset.StandardCharsets.UTF_8; @Warmup(iterations = 3) @Measurement(iterations = 5) @State(Scope.Benchmark) -// Test to get memory statistics for snappy, zstandard and lz4 string compression techniques +// Test to get memory statistics for snappy, zstandard, lz4 and gzip string compression techniques public class BenchmarkNoDictionaryStringCompression { @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"}) public static int _rowLength; - public static Random _random = new Random(); + private static final int MAX_CHARS_IN_LINE = 30; + private static final Random RANDOM = new Random(); + private static final ChunkCompressor LZ4_COMPRESSOR = ChunkCompressorFactory.getCompressor(ChunkCompressionType.LZ4); + private static final ChunkDecompressor LZ4_DECOMPRESSOR = + ChunkCompressorFactory.getDecompressor(ChunkCompressionType.LZ4); + private static final ChunkCompressor GZIP_COMPRESSOR = + ChunkCompressorFactory.getCompressor(ChunkCompressionType.GZIP); + private static final ChunkDecompressor GZIP_DECOMPRESSOR = + ChunkCompressorFactory.getDecompressor(ChunkCompressionType.GZIP); @State(Scope.Thread) - public static class BenchmarkNoDictionaryStringCompressionState { - private static ByteBuffer _uncompressedString; - private static ByteBuffer _snappyCompressedStringInput; - private static ByteBuffer _zstandardCompressedStringInput; - private static ByteBuffer _snappyCompressedStringOutput; - private static ByteBuffer _zstandardCompressedStringOutput; - private static ByteBuffer _snappyStringDecompressed; - private static ByteBuffer _zstandardStringDecompressed; - private static ByteBuffer _lz4CompressedStringOutput; - private static ByteBuffer _lz4CompressedStringInput; - private static ByteBuffer _lz4StringDecompressed; - - private static LZ4Factory _factory; + public static class CompressionBuffers { + + private ByteBuffer _snappyCompressedStringInput; + private ByteBuffer _zstandardCompressedStringInput; + private ByteBuffer _lz4CompressedStringInput; + private ByteBuffer _gzipCompressedStringInput; + private ByteBuffer _uncompressedString; + private ByteBuffer _stringDecompressed; + private ByteBuffer _stringCompressed; + + @Setup(Level.Trial) + public void setUp0() { + // generate random block of text alongside initialising memory buffers + byte[][] tempRows = new byte[_rowLength][]; + int size = 0; + for (int i = 0; i < _rowLength; i++) { + String value = RandomStringUtils.random(RANDOM.nextInt(MAX_CHARS_IN_LINE), true, true); + byte[] bytes = value.getBytes(UTF_8); + tempRows[i] = bytes; + size += bytes.length; + } + _uncompressedString = ByteBuffer.allocateDirect(size); + for (int i = 0; i < _rowLength; i++) { + _uncompressedString.put(tempRows[i]); + } + _uncompressedString.flip(); + + int capacity = _uncompressedString.capacity() * 2; + _stringDecompressed = ByteBuffer.allocateDirect(capacity); + _stringCompressed = ByteBuffer.allocateDirect(capacity); + _snappyCompressedStringInput = ByteBuffer.allocateDirect(capacity); + _zstandardCompressedStringInput = ByteBuffer.allocateDirect(capacity); + _lz4CompressedStringInput = ByteBuffer.allocateDirect(capacity); + _gzipCompressedStringInput = ByteBuffer.allocateDirect(capacity); + } @Setup(Level.Invocation) public void setUp() throws Exception { - initializeCompressors(); - generateRandomStringBuffer(); - allocateMemory(); - + _uncompressedString.rewind(); + _snappyCompressedStringInput.clear(); + _zstandardCompressedStringInput.clear(); + _lz4CompressedStringInput.clear(); + _gzipCompressedStringInput.clear(); + _stringDecompressed.clear(); + _stringCompressed.clear(); + + // prepare compressed buffers Snappy.compress(_uncompressedString, _snappyCompressedStringInput); Zstd.compress(_zstandardCompressedStringInput, _uncompressedString); // ZSTD compressor with change the position of _uncompressedString, a flip() operation over input to reset // position for lz4 is required _uncompressedString.flip(); - _factory.fastCompressor().compress(_uncompressedString, _lz4CompressedStringInput); - - _zstandardStringDecompressed.rewind(); _zstandardCompressedStringInput.flip(); - _uncompressedString.flip(); - _snappyStringDecompressed.flip(); - _lz4CompressedStringInput.flip(); - } - - private void initializeCompressors() { - //Initialize compressors and decompressors for lz4 - _factory = LZ4Factory.fastestInstance(); - } - - private void generateRandomStringBuffer() { - String[] tempRows = new String[_rowLength]; - int maxStringLengthInBytes = 0; - int numChars = 100; - for (int i = 0; i < _rowLength; i++) { - String value = RandomStringUtils.random(_random.nextInt(numChars), true, true); - maxStringLengthInBytes = Math.max(maxStringLengthInBytes, value.getBytes(UTF_8).length); - tempRows[i] = value; - } - - _uncompressedString = ByteBuffer.allocateDirect(_rowLength * maxStringLengthInBytes); - for (int i = 0; i < _rowLength; i++) { - _uncompressedString.put(tempRows[i].getBytes(UTF_8)); - } + LZ4_COMPRESSOR.compress(_uncompressedString, _lz4CompressedStringInput); _uncompressedString.flip(); - } - private void allocateMemory() { - _snappyCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _zstandardCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _snappyStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _zstandardStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _snappyCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _zstandardCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _lz4StringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _lz4CompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); - _lz4CompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2); + GZIP_COMPRESSOR.compress(_uncompressedString, _gzipCompressedStringInput); + _uncompressedString.flip(); } @TearDown(Level.Invocation) public void tearDown() throws Exception { - _snappyCompressedStringOutput.clear(); - _snappyStringDecompressed.clear(); - _zstandardCompressedStringOutput.clear(); - _zstandardStringDecompressed.clear(); - _lz4CompressedStringOutput.clear(); - _lz4StringDecompressed.clear(); - - _uncompressedString.rewind(); - _zstandardCompressedStringInput.rewind(); - _lz4CompressedStringInput.rewind(); + _snappyCompressedStringInput.clear(); + _zstandardCompressedStringInput.clear(); + _lz4CompressedStringInput.clear(); + _gzipCompressedStringInput.clear(); + _uncompressedString.clear(); + _stringDecompressed.clear(); + _stringCompressed.clear(); } } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkSnappyStringCompression(BenchmarkNoDictionaryStringCompressionState state) + public int benchmarkSnappyStringCompression(CompressionBuffers state) throws IOException { - int size = Snappy.compress(state._uncompressedString, state._snappyCompressedStringOutput); + int size = Snappy.compress(state._uncompressedString, state._stringCompressed); return size; } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkSnappyStringDecompression(BenchmarkNoDictionaryStringCompressionState state) + public int benchmarkSnappyStringDecompression(CompressionBuffers state) throws IOException { - int size = Snappy.uncompress(state._snappyCompressedStringInput, state._snappyStringDecompressed); + int size = Snappy.uncompress(state._snappyCompressedStringInput, state._stringDecompressed); return size; } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkZstandardStringCompression(BenchmarkNoDictionaryStringCompressionState state) - throws IOException { - int size = Zstd.compress(state._zstandardCompressedStringOutput, state._uncompressedString); + public int benchmarkZstandardStringCompression(CompressionBuffers state) { + int size = Zstd.compress(state._stringCompressed, state._uncompressedString); return size; } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkZstandardStringDecompression(BenchmarkNoDictionaryStringCompressionState state) - throws IOException { - int size = Zstd.decompress(state._zstandardStringDecompressed, state._zstandardCompressedStringInput); + public int benchmarkZstandardStringDecompression(CompressionBuffers state) { + int size = Zstd.decompress(state._stringDecompressed, state._zstandardCompressedStringInput); return size; } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkLZ4StringCompression( - BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state) + public int benchmarkLZ4HCStringCompression(CompressionBuffers state) throws IOException { - state._factory.fastCompressor().compress(state._uncompressedString, state._lz4CompressedStringOutput); - return state._lz4CompressedStringOutput.position(); + LZ4_COMPRESSOR.compress(state._uncompressedString, state._stringCompressed); + return state._stringCompressed.position(); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkLZ4StringDecompression( - BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state) + public int benchmarkLZ4HCStringDecompression(CompressionBuffers state) throws IOException { - state._factory.fastDecompressor().decompress(state._lz4CompressedStringInput, state._lz4StringDecompressed); - return state._lz4StringDecompressed.position(); + LZ4_DECOMPRESSOR.decompress(state._lz4CompressedStringInput, state._stringDecompressed); + return state._stringDecompressed.position(); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkLZ4HCStringCompression( - BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state) + public int benchmarkGZIPStringCompression(CompressionBuffers state) throws IOException { - state._factory.highCompressor().compress(state._uncompressedString, state._lz4CompressedStringOutput); - return state._lz4CompressedStringOutput.position(); + GZIP_COMPRESSOR.compress(state._uncompressedString, state._stringCompressed); + return state._stringCompressed.position(); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) - public int benchmarkLZ4HCStringDecompression( - BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state) + public int benchmarkGZIPStringDecompression(CompressionBuffers state) throws IOException { - state._factory.fastDecompressor().decompress(state._lz4CompressedStringInput, state._lz4StringDecompressed); - return state._lz4StringDecompressed.position(); + GZIP_DECOMPRESSOR.decompress(state._gzipCompressedStringInput, state._stringDecompressed); + return state._stringDecompressed.position(); } public static void main(String[] args) throws Exception { - new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build()) - .run(); + new Runner( + new OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build()).run(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java index b2d06b9718..15def2f733 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java @@ -50,8 +50,7 @@ public class ChunkCompressorFactory { * size. Most formats do this anyway, but LZ4 requires a length prefix. * @return Compressor for the specified type. */ - public static ChunkCompressor getCompressor(ChunkCompressionType compressionType, - boolean upgradeToLengthPrefixed) { + public static ChunkCompressor getCompressor(ChunkCompressionType compressionType, boolean upgradeToLengthPrefixed) { switch (compressionType) { case PASS_THROUGH: @@ -69,6 +68,9 @@ public class ChunkCompressorFactory { case LZ4_LENGTH_PREFIXED: return LZ4WithLengthCompressor.INSTANCE; + case GZIP: + return new GzipCompressor(); + default: throw new IllegalArgumentException("Illegal compressor name " + compressionType); } @@ -97,6 +99,9 @@ public class ChunkCompressorFactory { case LZ4_LENGTH_PREFIXED: return LZ4WithLengthDecompressor.INSTANCE; + case GZIP: + return new GzipDecompressor(); + default: throw new IllegalArgumentException("Illegal compressor name " + compressionType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java new file mode 100644 index 0000000000..3a83f7c8d2 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.io.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.zip.Deflater; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.ChunkCompressor; + + +/** + * Implementation of {@link ChunkCompressor} using GZIP compression algorithm. + */ +class GzipCompressor implements ChunkCompressor { + + private final Deflater _compressor; + + public GzipCompressor() { + _compressor = new Deflater(); + } + + @Override + public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed) + throws IOException { + _compressor.reset(); + _compressor.setInput(inUncompressed); + _compressor.finish(); + _compressor.deflate(outCompressed); + outCompressed.flip(); + return outCompressed.limit(); + } + + @Override + public int maxCompressedSize(int uncompressedSize) { + // https://github.com/luvit/zlib/blob/8de57bce969eb9dafc1f1f5c256ac608d0a73ec4/compress.c#L75 + return uncompressedSize + (uncompressedSize >> 12) + (uncompressedSize >> 14) + (uncompressedSize >> 25) + 13; + } + + @Override + public ChunkCompressionType compressionType() { + return ChunkCompressionType.GZIP; + } + + @Override + public void close() + throws IOException { + _compressor.end(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipDecompressor.java new file mode 100644 index 0000000000..b07d8acdbc --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipDecompressor.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.io.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; +import org.apache.pinot.segment.spi.compression.ChunkDecompressor; + + +/** + * Implementation of {@link ChunkDecompressor} using GZIP decompression algorithm. + */ +class GzipDecompressor implements ChunkDecompressor { + + private final Inflater _decompressor; + + public GzipDecompressor() { + _decompressor = new Inflater(); + } + + @Override + public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput) + throws IOException { + _decompressor.reset(); + _decompressor.setInput(compressedInput); + try { + _decompressor.inflate(decompressedOutput); + } catch (DataFormatException e) { + throw new IOException(e); + } + decompressedOutput.flip(); + return decompressedOutput.limit(); + } + + @Override + public int decompressedLength(ByteBuffer compressedInput) { + return -1; + } + + @Override + public void close() + throws IOException { + _decompressor.end(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java index 70d8f38706..0cdff5ce61 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java @@ -114,6 +114,7 @@ public abstract class BaseChunkForwardIndexWriter implements Closeable { _header.flip(); _dataFile.write(_header, 0); _dataFile.close(); + _chunkCompressor.close(); } /** @@ -192,7 +193,6 @@ public abstract class BaseChunkForwardIndexWriter implements Closeable { } _dataOffset += sizeToWrite; - _chunkBuffer.clear(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java index 868511a437..440808a6b0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java @@ -325,5 +325,6 @@ public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter { CleanerUtil.cleanQuietly(_compressionBuffer); CleanerUtil.cleanQuietly(_chunkBuffer); FileUtils.deleteQuietly(_dataBuffer); + _chunkCompressor.close(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java index c7855ee54f..745bd18fde 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java @@ -241,12 +241,10 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader< protected long getChunkPositionAndRecordRanges(int chunkId, List<ByteRange> ranges) { if (_headerEntryChunkOffsetSize == Integer.BYTES) { - ranges.add( - new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Integer.BYTES)); + ranges.add(new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Integer.BYTES)); return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize); } else { - ranges.add( - new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Long.BYTES)); + ranges.add(new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Long.BYTES)); return _dataHeader.getLong(chunkId * _headerEntryChunkOffsetSize); } } @@ -446,9 +444,11 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader< } @Override - public void close() { + public void close() + throws IOException { // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The // caller is responsible of closing the PinotDataBuffer. + _chunkDecompressor.close(); } private boolean isContiguousRange(int[] docIds, int length) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java index 47c30aec6b..f0a3658cb3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java @@ -266,6 +266,7 @@ public class VarByteChunkForwardIndexReaderV4 @Override public void close() throws IOException { + _chunkDecompressor.close(); } @Override diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java index 9f711929e9..245803ec53 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java @@ -21,6 +21,10 @@ package org.apache.pinot.segment.local.io.compression; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkCompressor; import org.apache.pinot.segment.spi.compression.ChunkDecompressor; @@ -41,45 +45,120 @@ public class TestCompression { buffer.put(input); buffer.flip(); return new Object[][]{ - {ChunkCompressionType.PASS_THROUGH, buffer.slice()}, - {ChunkCompressionType.SNAPPY, buffer.slice()}, - {ChunkCompressionType.LZ4, buffer.slice()}, - {ChunkCompressionType.LZ4_LENGTH_PREFIXED, buffer.slice()}, - {ChunkCompressionType.ZSTANDARD, buffer.slice()} + {ChunkCompressionType.PASS_THROUGH, buffer.slice()}, {ChunkCompressionType.SNAPPY, buffer.slice()}, + {ChunkCompressionType.LZ4, buffer.slice()}, {ChunkCompressionType.LZ4_LENGTH_PREFIXED, buffer.slice()}, + {ChunkCompressionType.ZSTANDARD, buffer.slice()}, {ChunkCompressionType.GZIP, buffer.slice()} }; } @Test(dataProvider = "formats") public void testRoundtrip(ChunkCompressionType type, ByteBuffer rawInput) throws IOException { - ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type); - assertEquals(compressor.compressionType(), type, "upgrade is opt in"); - roundtrip(compressor, rawInput); + try (ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type)) { + assertEquals(compressor.compressionType(), type, "upgrade is opt in"); + roundtrip(compressor, rawInput); + } } @Test(dataProvider = "formats") public void testRoundtripWithUpgrade(ChunkCompressionType type, ByteBuffer rawInput) throws IOException { - ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type, true); - assertNotEquals(compressor.compressionType(), ChunkCompressionType.LZ4, - "LZ4 compression type does not support length prefix"); - roundtrip(compressor, rawInput); + try (ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type, true)) { + assertNotEquals(compressor.compressionType(), ChunkCompressionType.LZ4, + "LZ4 compression type does not support length prefix"); + roundtrip(compressor, rawInput); + } } - private void roundtrip(ChunkCompressor compressor, ByteBuffer rawInput) + @Test(dataProvider = "formats") + public void testConcurrent(ChunkCompressionType type, ByteBuffer ignore) { + + String expected = "The gzip file format is:\n" + + "- a 10-byte header, containing a magic number (1f 8b), the compression method (08 for DEFLATE), " + + "1-byte of header flags, a 4-byte timestamp, compression flags and the operating system ID.\n" + + "- optional extra headers as allowed by the header flags, including the original filename, a " + + "comment field, an 'extra' field, and the lower half of a CRC-32 checksum for the header section.\n" + + "- a body, containing a DEFLATE-compressed payload.\n" + + "- an 8-byte trailer, containing a CRC-32 checksum and the length of the original uncompressed " + + "data, modulo 232.[4]\n" + + "gzip is normally used to compress just single files. Compressed archives are typically created " + + "by assembling collections of files into a single tar archive and then compressing that archive " + + "with gzip.\n gzip is not to be confused with ZIP, which can hold collections of files without " + + "an external archiver, but is less compact than compressed tarballs holding the same data, because " + + "it compresses files individually and cannot take advantage of redundancy between files.\n\n"; + byte[] input = expected.getBytes(StandardCharsets.UTF_8); + ByteBuffer rawInput = ByteBuffer.allocateDirect(input.length).put(input).flip(); + + Thread[] workers = new Thread[5]; + ByteBuffer[] compressed = new ByteBuffer[workers.length]; + ByteBuffer[] decompressed = new ByteBuffer[workers.length]; + CountDownLatch done = new CountDownLatch(workers.length); + AtomicInteger errors = new AtomicInteger(); + for (int i = 0; i < workers.length; i++) { + int idx = i; + workers[i] = new Thread(() -> { + try { + // compress + try (ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type)) { + compressed[idx] = ByteBuffer.allocateDirect(compressor.maxCompressedSize(rawInput.limit())); + compressor.compress(rawInput.slice(), compressed[idx]); + } + + // small context switch + TimeUnit.MILLISECONDS.sleep(1L + (long) (ThreadLocalRandom.current().nextDouble() * 10.0)); + + // decompress + try (ChunkDecompressor decompressor = ChunkCompressorFactory.getDecompressor(type)) { + int size = decompressor.decompressedLength(compressed[idx]); + if (type == ChunkCompressionType.LZ4 || type == ChunkCompressionType.GZIP) { + size = rawInput.limit(); + } + decompressed[idx] = ByteBuffer.allocateDirect(size); + decompressor.decompress(compressed[idx], decompressed[idx]); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } finally { + done.countDown(); + } + }); + workers[i].start(); + } + + try { + done.await(60L, TimeUnit.SECONDS); // it will not take this long + } catch (InterruptedException e) { + throw new AssertionError("timed-out"); + } + + // there are no errors + assertEquals(errors.get(), 0); + + // all decompressed buffers contain the original text + for (int i = 0; i < workers.length; i++) { + assertEquals(StandardCharsets.UTF_8.decode(decompressed[i]).toString(), expected); + compressed[i].clear(); + decompressed[i].clear(); + } + } + + private static void roundtrip(ChunkCompressor compressor, ByteBuffer rawInput) throws IOException { ByteBuffer compressedOutput = ByteBuffer.allocateDirect(compressor.maxCompressedSize(rawInput.limit())); compressor.compress(rawInput.slice(), compressedOutput); - ChunkDecompressor decompressor = ChunkCompressorFactory.getDecompressor(compressor.compressionType()); - int decompressedLength = decompressor.decompressedLength(compressedOutput); - assertTrue(compressor.compressionType() == ChunkCompressionType.LZ4 || decompressedLength > 0); - ByteBuffer decompressedOutput = ByteBuffer.allocateDirect( - compressor.compressionType() == ChunkCompressionType.LZ4 ? rawInput.limit() : decompressedLength); - decompressor.decompress(compressedOutput, decompressedOutput); - byte[] expected = new byte[rawInput.limit()]; - rawInput.get(expected); - byte[] actual = new byte[decompressedOutput.limit()]; - decompressedOutput.get(actual); - assertEquals(actual, expected, "content differs after compression roundt rip"); + try (ChunkDecompressor decompressor = ChunkCompressorFactory.getDecompressor(compressor.compressionType())) { + int decompressedLength = decompressor.decompressedLength(compressedOutput); + boolean isLz4OrGzip = compressor.compressionType() == ChunkCompressionType.LZ4 + || compressor.compressionType() == ChunkCompressionType.GZIP; + assertTrue(isLz4OrGzip || decompressedLength > 0); + ByteBuffer decompressedOutput = ByteBuffer.allocateDirect(isLz4OrGzip ? rawInput.limit() : decompressedLength); + decompressor.decompress(compressedOutput, decompressedOutput); + byte[] expected = new byte[rawInput.limit()]; + rawInput.get(expected); + byte[] actual = new byte[decompressedOutput.limit()]; + decompressedOutput.get(actual); + assertEquals(actual, expected, "content differs after compression roundt rip"); + } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java index 17f169081b..55551d9e93 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java @@ -75,6 +75,12 @@ public class VarByteChunkSVForwardIndexTest { test(ChunkCompressionType.LZ4); } + @Test + public void testWithGZIPCompression() + throws Exception { + test(ChunkCompressionType.GZIP); + } + /** * This test writes {@link #NUM_ENTRIES} using {@link VarByteChunkForwardIndexWriter}. It then reads * the strings & bytes using {@link VarByteChunkSVForwardIndexReader}, and asserts that what was written is the @@ -177,36 +183,43 @@ public class VarByteChunkSVForwardIndexTest { testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10, 1000); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10, 1000); testLargeVarcharHelper(ChunkCompressionType.LZ4, 10, 1000); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 10, 1000); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100, 1000); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100, 1000); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100, 1000); testLargeVarcharHelper(ChunkCompressionType.LZ4, 100, 1000); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 100, 1000); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000, 1000); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000, 1000); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000, 1000); testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000, 1000); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 1000, 1000); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10000, 100); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10000, 100); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10000, 100); testLargeVarcharHelper(ChunkCompressionType.LZ4, 10000, 100); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 10000, 100); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100000, 10); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100000, 10); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100000, 10); testLargeVarcharHelper(ChunkCompressionType.LZ4, 100000, 10); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 100000, 10); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000000, 10); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000000, 10); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000000, 10); testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000000, 10); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 1000000, 10); testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 2000000, 10); testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 2000000, 10); testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 2000000, 10); testLargeVarcharHelper(ChunkCompressionType.LZ4, 2000000, 10); + testLargeVarcharHelper(ChunkCompressionType.GZIP, 2000000, 10); } private void testLargeVarcharHelper(ChunkCompressionType compressionType, int numChars, int numDocs) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java index 53f6995a57..1df3e70364 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java @@ -86,21 +86,23 @@ public class ForwardIndexHandlerTest { private static final String DIM_PASS_THROUGH_STRING = "DIM_PASS_THROUGH_STRING"; private static final String DIM_ZSTANDARD_STRING = "DIM_ZSTANDARD_STRING"; private static final String DIM_LZ4_STRING = "DIM_LZ4_STRING"; + private static final String DIM_GZIP_STRING = "DIM_GZIP_STRING"; private static final String DIM_SNAPPY_LONG = "DIM_SNAPPY_LONG"; private static final String DIM_PASS_THROUGH_LONG = "DIM_PASS_THROUGH_LONG"; private static final String DIM_ZSTANDARD_LONG = "DIM_ZSTANDARD_LONG"; private static final String DIM_LZ4_LONG = "DIM_LZ4_LONG"; - + private static final String DIM_GZIP_LONG = "DIM_GZIP_LONG"; private static final String DIM_SNAPPY_INTEGER = "DIM_SNAPPY_INTEGER"; private static final String DIM_PASS_THROUGH_INTEGER = "DIM_PASS_THROUGH_INTEGER"; private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER"; private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER"; - + private static final String DIM_GZIP_INTEGER = "DIM_GZIP_INTEGER"; private static final String DIM_SNAPPY_BYTES = "DIM_SNAPPY_BYTES"; private static final String DIM_PASS_THROUGH_BYTES = "DIM_PASS_THROUGH_BYTES"; private static final String DIM_ZSTANDARD_BYTES = "DIM_ZSTANDARD_BYTES"; private static final String DIM_LZ4_BYTES = "DIM_LZ4_BYTES"; + private static final String DIM_GZIP_BYTES = "DIM_GZIP_BYTES"; // Sorted columns private static final String DIM_RAW_SORTED_INTEGER = "DIM_RAW_SORTED_INTEGER"; @@ -110,11 +112,13 @@ public class ForwardIndexHandlerTest { private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER"; private static final String METRIC_ZSTANDARD_INTEGER = "METRIC_ZSTANDARD_INTEGER"; private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER"; + private static final String METRIC_GZIP_INTEGER = "METRIC_GZIP_INTEGER"; private static final String METRIC_SNAPPY_BIG_DECIMAL = "METRIC_SNAPPY_BIG_DECIMAL"; private static final String METRIC_PASS_THROUGH_BIG_DECIMAL = "METRIC_PASS_THROUGH_BIG_DECIMAL"; private static final String METRIC_ZSTANDARD_BIG_DECIMAL = "METRIC_ZSTANDARD_BIG_DECIMAL"; private static final String METRIC_LZ4_BIG_DECIMAL = "METRIC_LZ4_BIG_DECIMAL"; + private static final String METRIC_GZIP_BIG_DECIMAL = "METRIC_GZIP_BIG_DECIMAL"; // Multi-value columns private static final String DIM_MV_PASS_THROUGH_INTEGER = "DIM_MV_PASS_THROUGH_INTEGER"; @@ -187,16 +191,20 @@ public class ForwardIndexHandlerTest { Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, DIM_LZ4_BYTES, METRIC_LZ4_BIG_DECIMAL, METRIC_LZ4_INTEGER); - private static final List<String> DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX = Arrays.asList(DIM_DICT_INTEGER, - DIM_DICT_LONG, DIM_DICT_STRING, DIM_DICT_BYES, DIM_DICT_MV_BYTES, DIM_DICT_MV_STRING, - DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG); + private static final List<String> RAW_GZIP_INDEX_COLUMNS = + Arrays.asList(DIM_GZIP_STRING, DIM_GZIP_LONG, DIM_GZIP_INTEGER, DIM_GZIP_BYTES, METRIC_GZIP_BIG_DECIMAL, + METRIC_GZIP_INTEGER); + + private static final List<String> DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX = + Arrays.asList(DIM_DICT_INTEGER, DIM_DICT_LONG, DIM_DICT_STRING, DIM_DICT_BYES, DIM_DICT_MV_BYTES, + DIM_DICT_MV_STRING, DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG); private static final List<String> DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX = Arrays.asList(DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG, DIM_DICT_MV_STRING, DIM_DICT_MV_BYTES); - private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS = Arrays.asList( - DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_SV_FORWARD_INDEX_DISABLED_LONG, DIM_SV_FORWARD_INDEX_DISABLED_STRING, - DIM_SV_FORWARD_INDEX_DISABLED_BYTES); + private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS = + Arrays.asList(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_SV_FORWARD_INDEX_DISABLED_LONG, + DIM_SV_FORWARD_INDEX_DISABLED_STRING, DIM_SV_FORWARD_INDEX_DISABLED_BYTES); private static final List<String> MV_FORWARD_INDEX_DISABLED_COLUMNS = Arrays.asList(DIM_MV_FORWARD_INDEX_DISABLED_INTEGER, DIM_MV_FORWARD_INDEX_DISABLED_LONG, @@ -241,13 +249,14 @@ public class ForwardIndexHandlerTest { List<FieldConfig> fieldConfigs = new ArrayList<>( RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_SORTED_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() - + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + RAW_LZ4_INDEX_COLUMNS.size() + + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + RAW_LZ4_INDEX_COLUMNS.size() + RAW_GZIP_INDEX_COLUMNS.size() + SV_FORWARD_INDEX_DISABLED_COLUMNS.size() + MV_FORWARD_INDEX_DISABLED_COLUMNS.size() + MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size() + FORWARD_INDEX_DISABLED_RAW_COLUMNS.size() + 2); for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) { - fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), - CompressionCodec.SNAPPY, null)); + fieldConfigs.add( + new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.SNAPPY, + null)); } for (String indexColumn : RAW_SORTED_INDEX_COLUMNS) { @@ -266,46 +275,56 @@ public class ForwardIndexHandlerTest { } for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) { - fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), - CompressionCodec.LZ4, null)); + fieldConfigs.add( + new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.LZ4, + null)); + } + + for (String indexColumn : RAW_GZIP_INDEX_COLUMNS) { + fieldConfigs.add( + new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.GZIP, + null)); } for (String indexColumn : SV_FORWARD_INDEX_DISABLED_COLUMNS) { - fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.DICTIONARY, Collections.singletonList( - FieldConfig.IndexType.INVERTED), null, + fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.DICTIONARY, + Collections.singletonList(FieldConfig.IndexType.INVERTED), null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); } for (String indexColumn : MV_FORWARD_INDEX_DISABLED_COLUMNS) { - fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.DICTIONARY, Collections.singletonList( - FieldConfig.IndexType.INVERTED), null, + fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.DICTIONARY, + Collections.singletonList(FieldConfig.IndexType.INVERTED), null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); } for (String indexColumn : MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS) { - fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.DICTIONARY, Collections.singletonList( - FieldConfig.IndexType.INVERTED), null, + fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.DICTIONARY, + Collections.singletonList(FieldConfig.IndexType.INVERTED), null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); } for (String indexColumn : FORWARD_INDEX_DISABLED_RAW_COLUMNS) { fieldConfigs.add( new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.LZ4, - Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); + Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); } - fieldConfigs.add(new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, - FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null, - Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); + fieldConfigs.add( + new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, FieldConfig.EncodingType.DICTIONARY, + Collections.emptyList(), null, + Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); - fieldConfigs.add(new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX, - FieldConfig.EncodingType.DICTIONARY, Collections.singletonList(FieldConfig.IndexType.RANGE), null, - Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); + fieldConfigs.add( + new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX, FieldConfig.EncodingType.DICTIONARY, + Collections.singletonList(FieldConfig.IndexType.RANGE), null, + Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS); _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS); _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS); _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS); + _noDictionaryColumns.addAll(RAW_GZIP_INDEX_COLUMNS); _noDictionaryColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS); _noDictionaryColumns.addAll(RAW_SORTED_INDEX_COLUMNS); @@ -330,30 +349,35 @@ public class ForwardIndexHandlerTest { .addSingleValueDimension(DIM_PASS_THROUGH_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(DIM_ZSTANDARD_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(DIM_LZ4_STRING, FieldSpec.DataType.STRING) + .addSingleValueDimension(DIM_GZIP_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(DIM_SNAPPY_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_RAW_SORTED_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_ZSTANDARD_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_LZ4_INTEGER, FieldSpec.DataType.INT) + .addSingleValueDimension(DIM_GZIP_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_SNAPPY_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_ZSTANDARD_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_PASS_THROUGH_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_LZ4_LONG, FieldSpec.DataType.LONG) + .addSingleValueDimension(DIM_GZIP_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_SNAPPY_BYTES, FieldSpec.DataType.BYTES) .addSingleValueDimension(DIM_PASS_THROUGH_BYTES, FieldSpec.DataType.BYTES) .addSingleValueDimension(DIM_ZSTANDARD_BYTES, FieldSpec.DataType.BYTES) .addSingleValueDimension(DIM_LZ4_BYTES, FieldSpec.DataType.BYTES) + .addSingleValueDimension(DIM_GZIP_BYTES, FieldSpec.DataType.BYTES) .addMetric(METRIC_SNAPPY_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) .addMetric(METRIC_PASS_THROUGH_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) .addMetric(METRIC_ZSTANDARD_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) .addMetric(METRIC_LZ4_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) + .addMetric(METRIC_GZIP_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING) .addSingleValueDimension(DIM_DICT_BYES, FieldSpec.DataType.BYTES) .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT) - .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT) - .addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT) + .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT) + .addMetric(METRIC_GZIP_INTEGER, FieldSpec.DataType.INT) .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT) .addMultiValueDimension(DIM_MV_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT) .addMultiValueDimension(DIM_MV_PASS_THROUGH_LONG, FieldSpec.DataType.LONG) @@ -480,13 +504,16 @@ public class ForwardIndexHandlerTest { row.putValue(DIM_ZSTANDARD_STRING, tempStringRows[i]); row.putValue(DIM_PASS_THROUGH_STRING, tempStringRows[i]); row.putValue(DIM_LZ4_STRING, tempStringRows[i]); + row.putValue(DIM_GZIP_STRING, tempStringRows[i]); // Raw integer columns row.putValue(DIM_SNAPPY_INTEGER, tempIntRows[i]); row.putValue(DIM_ZSTANDARD_INTEGER, tempIntRows[i]); row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]); row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]); + row.putValue(DIM_GZIP_INTEGER, tempIntRows[i]); row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]); + row.putValue(METRIC_GZIP_INTEGER, tempIntRows[i]); row.putValue(METRIC_PASS_THROUGH_INTEGER, tempIntRows[i]); row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]); row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]); @@ -497,18 +524,21 @@ public class ForwardIndexHandlerTest { row.putValue(DIM_ZSTANDARD_LONG, tempLongRows[i]); row.putValue(DIM_PASS_THROUGH_LONG, tempLongRows[i]); row.putValue(DIM_LZ4_LONG, tempLongRows[i]); + row.putValue(DIM_GZIP_LONG, tempLongRows[i]); // Raw Byte columns row.putValue(DIM_SNAPPY_BYTES, tempBytesRows[i]); row.putValue(DIM_ZSTANDARD_BYTES, tempBytesRows[i]); row.putValue(DIM_PASS_THROUGH_BYTES, tempBytesRows[i]); row.putValue(DIM_LZ4_BYTES, tempBytesRows[i]); + row.putValue(DIM_GZIP_BYTES, tempBytesRows[i]); // Raw BigDecimal column row.putValue(METRIC_SNAPPY_BIG_DECIMAL, tempBigDecimalRows[i]); row.putValue(METRIC_ZSTANDARD_BIG_DECIMAL, tempBigDecimalRows[i]); row.putValue(METRIC_PASS_THROUGH_BIG_DECIMAL, tempBigDecimalRows[i]); row.putValue(METRIC_LZ4_BIG_DECIMAL, tempBigDecimalRows[i]); + row.putValue(METRIC_GZIP_BIG_DECIMAL, tempBigDecimalRows[i]); // Dictionary SV columns row.putValue(DIM_DICT_INTEGER, tempIntRows[i]); @@ -556,7 +586,8 @@ public class ForwardIndexHandlerTest { } @Test - public void testComputeOperationNoOp() throws Exception { + public void testComputeOperationNoOp() + throws Exception { // Setup SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); SegmentDirectory segmentLocalFSDirectory = @@ -574,7 +605,8 @@ public class ForwardIndexHandlerTest { } @Test - public void testComputeOperationEnableDictionary() throws Exception { + public void testComputeOperationEnableDictionary() + throws Exception { // Setup SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); SegmentDirectory segmentLocalFSDirectory = @@ -628,13 +660,13 @@ public class ForwardIndexHandlerTest { assertEquals(operationMap.get(DIM_RAW_SORTED_INTEGER), Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_DICTIONARY)); - // Tear down segmentLocalFSDirectory.close(); } @Test - public void testComputeOperationDisableDictionary() throws Exception { + public void testComputeOperationDisableDictionary() + throws Exception { // Setup SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); SegmentDirectory segmentLocalFSDirectory = @@ -677,7 +709,8 @@ public class ForwardIndexHandlerTest { } @Test - public void testComputeOperationChangeCompression() throws Exception { + public void testComputeOperationChangeCompression() + throws Exception { // Setup SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); SegmentDirectory segmentLocalFSDirectory = @@ -696,9 +729,8 @@ public class ForwardIndexHandlerTest { randIdx = rand.nextInt(fieldConfigs.size()); name = fieldConfigs.get(randIdx).getName(); } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) - || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) - || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name) - || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) + || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains( + name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name)); FieldConfig config = fieldConfigs.remove(randIdx); CompressionCodec newCompressionType = null; @@ -794,8 +826,8 @@ public class ForwardIndexHandlerTest { assertEquals(operationMap.size(), 1); Set<ForwardIndexHandler.Operation> operations = new HashSet<>(operationMap.get(DIM_LZ4_INTEGER)); assertEquals(operations.size(), 2); - Set<ForwardIndexHandler.Operation> expectedOperations = - new HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX, + Set<ForwardIndexHandler.Operation> expectedOperations = new HashSet<>( + Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX, ForwardIndexHandler.Operation.ENABLE_DICTIONARY)); assertEquals(expectedOperations, operations); @@ -827,7 +859,7 @@ public class ForwardIndexHandlerTest { operations = new HashSet<>(operationMap.get(DIM_LZ4_LONG)); assertEquals(operations.size(), 2); expectedOperations = new HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX, - ForwardIndexHandler.Operation.ENABLE_DICTIONARY)); + ForwardIndexHandler.Operation.ENABLE_DICTIONARY)); assertEquals(expectedOperations, operations); operations = new HashSet<>(operationMap.get(DIM_SNAPPY_STRING)); assertEquals(operations.size(), 2); @@ -1108,8 +1140,7 @@ public class ForwardIndexHandlerTest { String columnName = config.getName(); FieldConfig newConfig = - new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), compressionType, - null); + new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), compressionType, null); fieldConfigs.add(newConfig); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1237,9 +1268,8 @@ public class ForwardIndexHandlerTest { randomIdx = rand.nextInt(fieldConfigs.size()); name = fieldConfigs.get(randomIdx).getName(); } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) - || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) - || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name) - || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) + || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains( + name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name)); FieldConfig config1 = fieldConfigs.remove(randomIdx); String column1 = config1.getName(); @@ -1253,9 +1283,8 @@ public class ForwardIndexHandlerTest { randomIdx = rand.nextInt(fieldConfigs.size()); name = fieldConfigs.get(randomIdx).getName(); } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) - || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) - || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name) - || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) + || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains( + name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name)); FieldConfig config2 = fieldConfigs.remove(randomIdx); String column2 = config2.getName(); @@ -1369,7 +1398,8 @@ public class ForwardIndexHandlerTest { } @Test - public void testEnableDictionaryForSortedColumn() throws Exception { + public void testEnableDictionaryForSortedColumn() + throws Exception { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); for (int i = 0; i < RAW_SORTED_INDEX_COLUMNS.size(); i++) { @@ -1467,8 +1497,8 @@ public class ForwardIndexHandlerTest { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); Random rand = new Random(); - String col1 = DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.get( - rand.nextInt(DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.size())); + String col1 = + DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.get(rand.nextInt(DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.size())); indexLoadingConfig.addForwardIndexDisabledColumns(col1); indexLoadingConfig.addInvertedIndexColumns(col1); String col2; @@ -1648,8 +1678,7 @@ public class ForwardIndexHandlerTest { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); Random rand = new Random(); - String col1 = RAW_LZ4_INDEX_COLUMNS.get( - rand.nextInt(RAW_LZ4_INDEX_COLUMNS.size())); + String col1 = RAW_LZ4_INDEX_COLUMNS.get(rand.nextInt(RAW_LZ4_INDEX_COLUMNS.size())); indexLoadingConfig.addForwardIndexDisabledColumns(col1); indexLoadingConfig.removeNoDictionaryColumns(col1); indexLoadingConfig.addInvertedIndexColumns(col1); @@ -1678,10 +1707,10 @@ public class ForwardIndexHandlerTest { } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(col1, true, dictionaryElementSize, metadata.getCardinality(), - metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), - metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), - metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); + validateMetadataProperties(col1, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), false); // Col2 validation. validateIndexMap(col2, true, true); @@ -1696,10 +1725,10 @@ public class ForwardIndexHandlerTest { } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) { dictionaryElementSize = 4; } - validateMetadataProperties(col2, true, dictionaryElementSize, metadata.getCardinality(), - metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), - metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), - metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); + validateMetadataProperties(col2, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), false); } @Test @@ -1801,10 +1830,10 @@ public class ForwardIndexHandlerTest { // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column); FieldSpec.DataType dataType = metadata.getDataType(); - validateMetadataProperties(column, false, 0, metadata.getCardinality(), - metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), - metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), - metadata.getMinValue(), metadata.getMaxValue(), false); + validateMetadataProperties(column, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), dataType, + metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), + metadata.getMaxValue(), false); } } @@ -1923,8 +1952,8 @@ public class ForwardIndexHandlerTest { Random rand = new Random(); // Remove from forward index list but keep the inverted index enabled - String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS - .get(rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size())); + String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.get( + rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size())); indexLoadingConfig.removeForwardIndexDisabledColumns(column); ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema); @@ -2020,20 +2049,20 @@ public class ForwardIndexHandlerTest { validateIndexMap(col1, false, false); validateForwardIndex(col1, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing should change. - validateMetadataProperties(col1, false, 0, metadata.getCardinality(), - metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), - metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), - metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); + validateMetadataProperties(col1, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), + metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), false); // Col2 validation. metadata = existingSegmentMetadata.getColumnMetadataFor(col2); validateIndexMap(col2, false, false); validateForwardIndex(col2, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing should change. - validateMetadataProperties(col2, false, 0, metadata.getCardinality(), - metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), - metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), - metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), false); + validateMetadataProperties(col2, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), + metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), false); } @Test @@ -2047,8 +2076,8 @@ public class ForwardIndexHandlerTest { Random rand = new Random(); // Remove from forward index list but keep the inverted index enabled - String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS - .get(rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size())); + String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.get( + rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size())); indexLoadingConfig.removeForwardIndexDisabledColumns(column); indexLoadingConfig.removeInvertedIndexColumns(column); indexLoadingConfig.addNoDictionaryColumns(column); @@ -2066,10 +2095,10 @@ public class ForwardIndexHandlerTest { validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, some values can change since MV columns with duplicates lose the duplicates on forward index // regeneration. - validateMetadataProperties(column, false, 0, metadata.getCardinality(), - metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), - metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), - metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), true); + validateMetadataProperties(column, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), + metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), true); } @Test @@ -2111,11 +2140,10 @@ public class ForwardIndexHandlerTest { validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing should change. - validateMetadataProperties(column, false, 0, - metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), - metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), - metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), - metadata.getMaxValue(), false); + validateMetadataProperties(column, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), + metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), false); } } @@ -2146,8 +2174,7 @@ public class ForwardIndexHandlerTest { validateIndexMap(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, true, true); validateIndexesForForwardIndexDisabledColumns(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX); - ForwardIndexHandler fwdIndexHandler = - new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema); + ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema); fwdIndexHandler.updateIndices(writer); fwdIndexHandler.postUpdateIndicesCleanup(writer); @@ -2164,8 +2191,8 @@ public class ForwardIndexHandlerTest { validateMetadataProperties(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, metadata.hasDictionary(), metadata.getColumnMaxLength(), metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), - metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), - metadata.getMaxValue(), false); + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), + false); } @Test @@ -2198,8 +2225,7 @@ public class ForwardIndexHandlerTest { validateIndexMap(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, false, true); validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER); - ForwardIndexHandler fwdIndexHandler = - new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema); + ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema); fwdIndexHandler.updateIndices(writer); fwdIndexHandler.postUpdateIndicesCleanup(writer); @@ -2213,20 +2239,18 @@ public class ForwardIndexHandlerTest { validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER); // In column metadata, nothing should change. - ColumnMetadata metadata = - existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER); + ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER); validateMetadataProperties(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, metadata.hasDictionary(), metadata.getColumnMaxLength(), metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), - metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), - metadata.getMaxValue(), false); - metadata = - existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER); + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), + false); + metadata = existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER); validateMetadataProperties(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER, metadata.hasDictionary(), metadata.getColumnMaxLength(), metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), - metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), - metadata.getMaxValue(), false); + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), + false); } @Test @@ -2242,10 +2266,10 @@ public class ForwardIndexHandlerTest { // Add column to range index list. Must be a numerical type. String column; do { - column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS - .get(rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size())); - } while (!column.equals(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_STRING) - && !column.equals(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES)); + column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.get( + rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size())); + } while (!column.equals(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_STRING) && !column.equals( + DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES)); indexLoadingConfig.addRangeIndexColumns(column); RangeIndexHandler rangeIndexHandler = new RangeIndexHandler(segmentLocalFSDirectory, indexLoadingConfig); @@ -2271,10 +2295,10 @@ public class ForwardIndexHandlerTest { // In column metadata, some values can change since MV columns with duplicates lose the duplicates on forward index // regeneration. ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column); - validateMetadataProperties(column, true, 7, metadata.getCardinality(), - metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), - metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), - metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue(), true); + validateMetadataProperties(column, true, 7, metadata.getCardinality(), metadata.getTotalDocs(), + metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue(), true); // Validate that expected metadata properties don't match. totalNumberOfEntries will definitely not match since // duplicates will be removed, but maxNumberOfMultiValues may still match if the row with max multi-values didn't @@ -2332,34 +2356,24 @@ public class ForwardIndexHandlerTest { IndexType index1 = Mockito.mock(IndexType.class); Mockito.when(index1.getId()).thenReturn("index1"); IndexConfig indexConf = new IndexConfig(true); - FieldIndexConfigs fieldIndexConfigs = new FieldIndexConfigs.Builder() - .add(index1, indexConf) - .build(); + FieldIndexConfigs fieldIndexConfigs = new FieldIndexConfigs.Builder().add(index1, indexConf).build(); // No need to disable dictionary - boolean result = DictionaryIndexType.ignoreDictionaryOverride(false, true, - 2, fieldSpec, - fieldIndexConfigs, 5, 20); + boolean result = DictionaryIndexType.ignoreDictionaryOverride(false, true, 2, fieldSpec, fieldIndexConfigs, 5, 20); Assert.assertEquals(result, true); // Set a higher noDictionarySizeRatioThreshold - result = DictionaryIndexType.ignoreDictionaryOverride(false, true, - 5, fieldSpec, - fieldIndexConfigs, 5, 20); + result = DictionaryIndexType.ignoreDictionaryOverride(false, true, 5, fieldSpec, fieldIndexConfigs, 5, 20); Assert.assertEquals(result, false); // optimizeDictionary and optimizeDictionaryForMetrics both turned on - result = DictionaryIndexType.ignoreDictionaryOverride(true, true, - 5, fieldSpec, - fieldIndexConfigs, 5, 20); + result = DictionaryIndexType.ignoreDictionaryOverride(true, true, 5, fieldSpec, fieldIndexConfigs, 5, 20); Assert.assertEquals(result, false); // Don't ignore for Json. We want to disable dictionary for json. fieldSpec = new DimensionFieldSpec(); fieldSpec.setName("test"); fieldSpec.setDataType(FieldSpec.DataType.JSON); - result = DictionaryIndexType.ignoreDictionaryOverride(true, true, - 5, fieldSpec, - fieldIndexConfigs, 5, 20); + result = DictionaryIndexType.ignoreDictionaryOverride(true, true, 5, fieldSpec, fieldIndexConfigs, 5, 20); Assert.assertEquals(result, true); } @@ -2558,7 +2572,8 @@ public class ForwardIndexHandlerTest { } } - private void testIndexExists(String columnName, IndexType<?, ?, ?> indexType) throws Exception { + private void testIndexExists(String columnName, IndexType<?, ?, ?> indexType) + throws Exception { SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java index 97d7057d03..79c678c260 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java @@ -19,7 +19,7 @@ package org.apache.pinot.segment.spi.compression; public enum ChunkCompressionType { - PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4); + PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4), GZIP(5); private static final ChunkCompressionType[] VALUES = values(); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java index a6ab78c4ea..4ce9ce82be 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.spi.compression; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -25,7 +26,7 @@ import java.nio.ByteBuffer; /** * Interface to compress a chunk of data. */ -public interface ChunkCompressor { +public interface ChunkCompressor extends Closeable { /** * This method compresses the given data. The output compressed ByteBuffer is returned ready for read. @@ -51,4 +52,9 @@ public interface ChunkCompressor { * @return this compressor's type */ ChunkCompressionType compressionType(); + + @Override + default void close() throws IOException { + // no-op + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java index 2eeb33d6c9..b3f563bb44 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.spi.compression; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -25,7 +26,7 @@ import java.nio.ByteBuffer; /** * Interface to decompress a chunk of data. */ -public interface ChunkDecompressor { +public interface ChunkDecompressor extends Closeable { /** * This method decompresses chunk of data that was compressed using {@link @@ -48,4 +49,9 @@ public interface ChunkDecompressor { */ int decompressedLength(ByteBuffer compressedInput) throws IOException; + + @Override + default void close() throws IOException { + // no-op + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java index 3d192aa6a1..1351b35d96 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java @@ -57,10 +57,8 @@ public class FieldIndexConfigs { } public Map<String, JsonNode> unwrapIndexes() { - Function<Map.Entry<IndexType, IndexConfig>, JsonNode> serializer = - entry -> entry.getValue().toJsonNode(); - return _configMap.entrySet().stream() - .filter(e -> e.getValue() != null) + Function<Map.Entry<IndexType, IndexConfig>, JsonNode> serializer = entry -> entry.getValue().toJsonNode(); + return _configMap.entrySet().stream().filter(e -> e.getValue() != null) .collect(Collectors.toMap(entry -> entry.getKey().getId(), serializer)); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java index 132705036b..70de007f8e 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java @@ -74,6 +74,10 @@ public class ForwardIndexConfig extends IndexConfig { _chunkCompressionType = ChunkCompressionType.LZ4; _dictIdCompressionType = null; break; + case GZIP: + _chunkCompressionType = ChunkCompressionType.GZIP; + _dictIdCompressionType = null; + break; case MV_ENTRY_DICT: _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT; _chunkCompressionType = null; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java index 704cb2e01c..201edeb39a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java @@ -52,8 +52,8 @@ public class FieldConfig extends BaseJsonConfig { public static final String TEXT_INDEX_LUCENE_USE_COMPOUND_FILE = "luceneUseCompoundFile"; public static final String TEXT_INDEX_LUCENE_MAX_BUFFER_SIZE_MB = "luceneMaxBufferSizeMB"; public static final String TEXT_INDEX_LUCENE_ANALYZER_CLASS = "luceneAnalyzerClass"; - public static final String TEXT_INDEX_DEFAULT_LUCENE_ANALYZER_CLASS - = "org.apache.lucene.analysis.standard.StandardAnalyzer"; + public static final String TEXT_INDEX_DEFAULT_LUCENE_ANALYZER_CLASS = + "org.apache.lucene.analysis.standard.StandardAnalyzer"; public static final String TEXT_INDEX_STOP_WORD_SEPERATOR = ","; // "native" for native, default is Lucene public static final String TEXT_FST_TYPE = "fstType"; @@ -102,8 +102,8 @@ public class FieldConfig extends BaseJsonConfig { Preconditions.checkArgument(name != null, "'name' must be configured"); _name = name; _encodingType = encodingType == null ? EncodingType.DICTIONARY : encodingType; - _indexTypes = indexTypes != null ? indexTypes : ( - indexType == null ? Lists.newArrayList() : Lists.newArrayList(indexType)); + _indexTypes = + indexTypes != null ? indexTypes : (indexType == null ? Lists.newArrayList() : Lists.newArrayList(indexType)); _compressionCodec = compressionCodec; _timestampConfig = timestampConfig; _properties = properties; @@ -129,6 +129,7 @@ public class FieldConfig extends BaseJsonConfig { // CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a // special handling for log lines (see {@link CLPForwardIndexCreatorV1}) CLP(false, false), + GZIP(true, false), // For MV dictionary encoded forward index, add a second level dictionary encoding for the multi-value entries MV_ENTRY_DICT(false, true); @@ -258,8 +259,8 @@ public class FieldConfig extends BaseJsonConfig { } public FieldConfig build() { - return new FieldConfig(_name, _encodingType, null, _indexTypes, _compressionCodec, _timestampConfig, - _indexes, _properties, _tierOverwrites); + return new FieldConfig(_name, _encodingType, null, _indexTypes, _compressionCodec, _timestampConfig, _indexes, + _properties, _tierOverwrites); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org