This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 470f8ba2e0 Add GZIP Compression Codec (#11434) (#12668)
470f8ba2e0 is described below

commit 470f8ba2e07254df2312c8961c35baedf3db35ad
Author: marregui <miguel.arre...@gmail.com>
AuthorDate: Thu Mar 28 18:28:05 2024 +0100

    Add GZIP Compression Codec (#11434) (#12668)
---
 .../NoDictionaryCompressionQueriesTest.java        |  73 +++++-
 .../BenchmarkNoDictionaryIntegerCompression.java   |  44 +++-
 .../perf/BenchmarkNoDictionaryLongCompression.java |  41 +++-
 .../BenchmarkNoDictionaryStringCompression.java    | 186 +++++++--------
 .../io/compression/ChunkCompressorFactory.java     |   9 +-
 .../local/io/compression/GzipCompressor.java       |  66 ++++++
 .../local/io/compression/GzipDecompressor.java     |  63 +++++
 .../writer/impl/BaseChunkForwardIndexWriter.java   |   2 +-
 .../impl/VarByteChunkForwardIndexWriterV4.java     |   1 +
 .../forward/BaseChunkForwardIndexReader.java       |  10 +-
 .../forward/VarByteChunkForwardIndexReaderV4.java  |   1 +
 .../local/io/compression/TestCompression.java      | 127 ++++++++--
 .../forward/VarByteChunkSVForwardIndexTest.java    |  13 +
 .../index/loader/ForwardIndexHandlerTest.java      | 261 +++++++++++----------
 .../spi/compression/ChunkCompressionType.java      |   2 +-
 .../segment/spi/compression/ChunkCompressor.java   |   8 +-
 .../segment/spi/compression/ChunkDecompressor.java |   8 +-
 .../pinot/segment/spi/index/FieldIndexConfigs.java |   6 +-
 .../segment/spi/index/ForwardIndexConfig.java      |   4 +
 .../apache/pinot/spi/config/table/FieldConfig.java |  13 +-
 20 files changed, 659 insertions(+), 279 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
index 9fe631aa8f..7ae5dbe2cf 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
@@ -69,16 +69,18 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
   private static final String PASS_THROUGH_STRING = "PASS_THROUGH_STRING";
   private static final String ZSTANDARD_STRING = "ZSTANDARD_STRING";
   private static final String LZ4_STRING = "LZ4_STRING";
+  private static final String GZIP_STRING = "GZIP_STRING";
 
   private static final String SNAPPY_LONG = "SNAPPY_LONG";
   private static final String PASS_THROUGH_LONG = "PASS_THROUGH_LONG";
   private static final String ZSTANDARD_LONG = "ZSTANDARD_LONG";
   private static final String LZ4_LONG = "LZ4_LONG";
-
+  private static final String GZIP_LONG = "GZIP_LONG";
   private static final String SNAPPY_INTEGER = "SNAPPY_INTEGER";
   private static final String PASS_THROUGH_INTEGER = "PASS_THROUGH_INTEGER";
   private static final String ZSTANDARD_INTEGER = "ZSTANDARD_INTEGER";
   private static final String LZ4_INTEGER = "LZ4_INTEGER";
+  private static final String GZIP_INTEGER = "GZIP_INTEGER";
 
   private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
       Arrays.asList(SNAPPY_STRING, SNAPPY_LONG, SNAPPY_INTEGER);
@@ -90,6 +92,7 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
       Arrays.asList(PASS_THROUGH_STRING, PASS_THROUGH_LONG, 
PASS_THROUGH_INTEGER);
 
   private static final List<String> RAW_LZ4_INDEX_COLUMNS = 
Arrays.asList(LZ4_STRING, LZ4_LONG, LZ4_INTEGER);
+  private static final List<String> RAW_GZIP_INDEX_COLUMNS = 
Arrays.asList(GZIP_STRING, GZIP_LONG, GZIP_INTEGER);
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -123,6 +126,7 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
     indexColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
     indexColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
     indexColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
+    indexColumns.addAll(RAW_GZIP_INDEX_COLUMNS);
 
     indexLoadingConfig.addNoDictionaryColumns(indexColumns);
     ImmutableSegment immutableSegment =
@@ -143,7 +147,7 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
 
     List<FieldConfig> fieldConfigs = new ArrayList<>(
         RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + 
RAW_PASS_THROUGH_INDEX_COLUMNS.size()
-            + RAW_LZ4_INDEX_COLUMNS.size());
+            + RAW_LZ4_INDEX_COLUMNS.size() + RAW_GZIP_INDEX_COLUMNS.size());
 
     for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
@@ -165,11 +169,17 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
           FieldConfig.CompressionCodec.LZ4, null));
     }
 
+    for (String indexColumn : RAW_GZIP_INDEX_COLUMNS) {
+      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
+          FieldConfig.CompressionCodec.GZIP, null));
+    }
+
     List<String> noDictionaryColumns = new ArrayList<>();
     noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
     noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
     noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
     noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
+    noDictionaryColumns.addAll(RAW_GZIP_INDEX_COLUMNS);
 
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(noDictionaryColumns)
@@ -179,14 +189,17 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
         .addSingleValueDimension(PASS_THROUGH_STRING, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(ZSTANDARD_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(LZ4_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(GZIP_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(SNAPPY_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(GZIP_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(SNAPPY_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(ZSTANDARD_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
-        .addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG).build();
+        .addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(GZIP_LONG, FieldSpec.DataType.LONG).build();
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
     config.setOutDir(INDEX_DIR.getPath());
     config.setTableName(TABLE_NAME);
@@ -227,14 +240,17 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
       row.putValue(ZSTANDARD_STRING, tempStringRows[i]);
       row.putValue(PASS_THROUGH_STRING, tempStringRows[i]);
       row.putValue(LZ4_STRING, tempStringRows[i]);
+      row.putValue(GZIP_STRING, tempStringRows[i]);
       row.putValue(SNAPPY_INTEGER, tempIntRows[i]);
       row.putValue(ZSTANDARD_INTEGER, tempIntRows[i]);
       row.putValue(PASS_THROUGH_INTEGER, tempIntRows[i]);
       row.putValue(LZ4_INTEGER, tempIntRows[i]);
+      row.putValue(GZIP_INTEGER, tempIntRows[i]);
       row.putValue(SNAPPY_LONG, tempLongRows[i]);
       row.putValue(ZSTANDARD_LONG, tempLongRows[i]);
       row.putValue(PASS_THROUGH_LONG, tempLongRows[i]);
       row.putValue(LZ4_LONG, tempLongRows[i]);
+      row.putValue(GZIP_LONG, tempLongRows[i]);
       rows.add(row);
     }
     return rows;
@@ -246,18 +262,19 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
   @Test
   public void testQueriesWithCompressionCodec() {
     String query = "SELECT SNAPPY_STRING, ZSTANDARD_STRING, 
PASS_THROUGH_STRING, LZ4_STRING, "
-        + "SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, 
LZ4_INTEGER, "
-        + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, LZ4_LONG FROM 
MyTable LIMIT 1000";
+        + "GZIP_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, 
PASS_THROUGH_INTEGER, LZ4_INTEGER, "
+        + "GZIP_INTEGER, SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, 
LZ4_LONG, GZIP_LONG FROM MyTable LIMIT 1000";
     ArrayList<Serializable[]> expected = new ArrayList<>();
 
     for (GenericRow row : _rows) {
       expected.add(new Serializable[]{
-          String.valueOf(row.getValue(SNAPPY_STRING)), 
String.valueOf(row.getValue(ZSTANDARD_STRING)),
-          String.valueOf(row.getValue(PASS_THROUGH_STRING)), 
String.valueOf(row.getValue(LZ4_STRING)),
-          (Integer) row.getValue(SNAPPY_INTEGER), (Integer) 
row.getValue(ZSTANDARD_INTEGER),
-          (Integer) row.getValue(PASS_THROUGH_INTEGER), (Integer) 
row.getValue(LZ4_INTEGER),
-          (Long) row.getValue(SNAPPY_LONG), (Long) 
row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG),
-          (Long) row.getValue(LZ4_LONG)
+          String.valueOf(row.getValue(SNAPPY_STRING)), 
String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(
+          row.getValue(PASS_THROUGH_STRING)), 
String.valueOf(row.getValue(LZ4_STRING)), String.valueOf(
+          row.getValue(GZIP_STRING)), (Integer) row.getValue(SNAPPY_INTEGER), 
(Integer) row.getValue(
+          ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER), 
(Integer) row.getValue(
+          LZ4_INTEGER), (Integer) row.getValue(GZIP_INTEGER), (Long) 
row.getValue(SNAPPY_LONG), (Long) row.getValue(
+          ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), (Long) 
row.getValue(LZ4_LONG), (Long) row.getValue(
+          GZIP_LONG)
       });
     }
     testSelectQueryHelper(query, expected.size(), expected);
@@ -297,6 +314,23 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
     testSelectQueryHelper(query, expected.size(), expected);
   }
 
+  /**
+   * Tests for filter over integer values GZIP compression codec queries.
+   */
+  @Test
+  public void testGZIPIntegerFilterQueriesWithCompressionCodec() {
+    String query = "SELECT GZIP_INTEGER FROM MyTable WHERE GZIP_INTEGER > 1000 
LIMIT 1000";
+    ArrayList<Serializable[]> expected = new ArrayList<>();
+
+    for (GenericRow row : _rows) {
+      int value = (Integer) row.getValue(GZIP_INTEGER);
+      if (value > 1000) {
+        expected.add(new Serializable[]{value});
+      }
+    }
+    testSelectQueryHelper(query, expected.size(), expected);
+  }
+
   /**
    * Tests for filter over integer values compression codec queries.
    */
@@ -365,6 +399,23 @@ public class NoDictionaryCompressionQueriesTest extends 
BaseQueriesTest {
     testSelectQueryHelper(query, expected.size(), expected);
   }
 
+  /**
+   * Tests for filter over string values GZIP compression codec queries.
+   */
+  @Test
+  public void testGZIPStringFilterQueriesWithCompressionCodec() {
+    String query = "SELECT GZIP_STRING FROM MyTable WHERE GZIP_STRING = 
'hello_world_123' LIMIT 1000";
+    ArrayList<Serializable[]> expected = new ArrayList<>();
+
+    for (GenericRow row : _rows) {
+      String value = String.valueOf(row.getValue(GZIP_STRING));
+      if (value.equals("hello_world_123")) {
+        expected.add(new Serializable[]{value});
+      }
+    }
+    testSelectQueryHelper(query, expected.size(), expected);
+  }
+
   /**
    * Tests for filter over string values snappy compression codec queries.
    */
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
index 6c1a0e3ae2..f51ff94f15 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
@@ -24,6 +24,10 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -68,7 +72,13 @@ public class BenchmarkNoDictionaryIntegerCompression {
     private static ByteBuffer _lz4CompressedIntegerInput;
     private static ByteBuffer _lz4IntegerDecompressed;
 
+    private static ByteBuffer _gzipCompressedIntegerOutput;
+    private static ByteBuffer _gzipCompressedIntegerInput;
+    private static ByteBuffer _gzipIntegerDecompressed;
+
     private static LZ4Factory _factory;
+    private static ChunkCompressor _gzipCompressor;
+    private static ChunkDecompressor _gzipDecompressor;
 
     @Setup(Level.Invocation)
     public void setUp()
@@ -84,12 +94,14 @@ public class BenchmarkNoDictionaryIntegerCompression {
       // position for lz4 is required
       _uncompressedInt.flip();
       _factory.fastCompressor().compress(_uncompressedInt, 
_lz4CompressedIntegerInput);
+      _gzipCompressor.compress(_uncompressedInt, _gzipCompressedIntegerInput);
 
       _zstdIntegerDecompressed.rewind();
       _zstandardCompressedIntegerInput.flip();
       _uncompressedInt.flip();
       _snappyIntegerDecompressed.rewind();
       _lz4CompressedIntegerInput.flip();
+      _gzipCompressedIntegerInput.flip();
     }
 
     private void generateRandomIntegerBuffer() {
@@ -102,8 +114,10 @@ public class BenchmarkNoDictionaryIntegerCompression {
     }
 
     private void initializeCompressors() {
-      //Initialize compressors and decompressors for lz4
+      //Initialize compressors and decompressors for lz4 and gzip
       _factory = LZ4Factory.fastestInstance();
+      _gzipCompressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.GZIP);
+      _gzipDecompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.GZIP);
     }
 
     private void allocateBufferMemory() {
@@ -117,6 +131,9 @@ public class BenchmarkNoDictionaryIntegerCompression {
       _lz4CompressedIntegerOutput = 
ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2);
       _snappyCompressedIntegerOutput = 
ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2);
       _zstdCompressedIntegerOutput = 
ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2);
+      _gzipIntegerDecompressed = 
ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2);
+      _gzipCompressedIntegerOutput = 
ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2);
+      _gzipCompressedIntegerInput = 
ByteBuffer.allocateDirect(_uncompressedInt.capacity() * 2);
     }
 
     @TearDown(Level.Invocation)
@@ -128,10 +145,13 @@ public class BenchmarkNoDictionaryIntegerCompression {
       _zstdIntegerDecompressed.clear();
       _lz4CompressedIntegerOutput.clear();
       _lz4IntegerDecompressed.clear();
+      _gzipCompressedIntegerOutput.clear();
+      _gzipIntegerDecompressed.clear();
 
       _uncompressedInt.rewind();
       _zstandardCompressedIntegerInput.rewind();
       _lz4CompressedIntegerInput.rewind();
+      _gzipCompressedIntegerInput.rewind();
     }
   }
 
@@ -207,9 +227,27 @@ public class BenchmarkNoDictionaryIntegerCompression {
     return state._lz4IntegerDecompressed.position();
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int 
benchmarkGZIPIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState 
state)
+      throws IOException {
+    state._gzipCompressor.compress(state._uncompressedInt, 
state._gzipCompressedIntegerOutput);
+    return state._gzipCompressedIntegerOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int 
benchmarkGZIPIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState 
state)
+      throws IOException {
+    state._gzipDecompressor.decompress(state._gzipCompressedIntegerInput, 
state._gzipIntegerDecompressed);
+    return state._gzipIntegerDecompressed.position();
+  }
+
   public static void main(String[] args)
       throws Exception {
-    new Runner(new 
OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build())
-        .run();
+    new Runner(
+        new 
OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()).run();
   }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
index b81d26a19d..1819278b28 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
@@ -24,6 +24,10 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -67,8 +71,13 @@ public class BenchmarkNoDictionaryLongCompression {
     private static ByteBuffer _lz4CompressedLongOutput;
     private static ByteBuffer _lz4CompressedLongInput;
     private static ByteBuffer _lz4LongDecompressed;
+    private static ByteBuffer _gzipCompressedLongOutput;
+    private static ByteBuffer _gzipCompressedLongInput;
+    private static ByteBuffer _gzipLongDecompressed;
 
     private static LZ4Factory _factory;
+    private static ChunkCompressor _gzipCompressor;
+    private static ChunkDecompressor _gzipDecompressor;
 
     @Setup(Level.Invocation)
     public void setUp()
@@ -84,12 +93,14 @@ public class BenchmarkNoDictionaryLongCompression {
       // position for lz4 is required
       _uncompressedLong.flip();
       _factory.fastCompressor().compress(_uncompressedLong, 
_lz4CompressedLongInput);
+      _gzipCompressor.compress(_uncompressedLong, _gzipCompressedLongInput);
 
       _zstandardLongDecompressedOutput.rewind();
       _zstandardCompressedLongInput.flip();
       _uncompressedLong.flip();
       _snappyLongDecompressedOutput.flip();
       _lz4CompressedLongInput.flip();
+      _gzipCompressedLongInput.flip();
     }
 
     private void generateRandomLongBuffer() {
@@ -102,8 +113,10 @@ public class BenchmarkNoDictionaryLongCompression {
     }
 
     private void initializeCompressors() {
-      //Initialize compressors and decompressors for lz4
+      //Initialize compressors and decompressors for lz4 and gzip
       _factory = LZ4Factory.fastestInstance();
+      _gzipCompressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.GZIP);
+      _gzipDecompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.GZIP);
     }
 
     private void allocateBufferMemory() {
@@ -116,6 +129,9 @@ public class BenchmarkNoDictionaryLongCompression {
       _lz4LongDecompressed = 
ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2);
       _lz4CompressedLongOutput = 
ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2);
       _lz4CompressedLongInput = 
ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2);
+      _gzipLongDecompressed = 
ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2);
+      _gzipCompressedLongOutput = 
ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2);
+      _gzipCompressedLongInput = 
ByteBuffer.allocateDirect(_uncompressedLong.capacity() * 2);
     }
 
     @TearDown(Level.Invocation)
@@ -127,10 +143,13 @@ public class BenchmarkNoDictionaryLongCompression {
       _zstandardLongDecompressedOutput.clear();
       _lz4CompressedLongOutput.clear();
       _lz4LongDecompressed.clear();
+      _gzipCompressedLongOutput.clear();
+      _gzipLongDecompressed.clear();
 
       _uncompressedLong.rewind();
       _zstandardCompressedLongInput.rewind();
       _lz4CompressedLongInput.rewind();
+      _gzipCompressedLongInput.rewind();
     }
   }
 
@@ -210,6 +229,26 @@ public class BenchmarkNoDictionaryLongCompression {
     return state._lz4LongDecompressed.position();
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkGZIPLongCompression(
+      
BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState 
state)
+      throws IOException {
+    state._gzipCompressor.compress(state._uncompressedLong, 
state._gzipCompressedLongOutput);
+    return state._gzipCompressedLongOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkGZIPLongDecompression(
+      
BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState 
state)
+      throws IOException {
+    state._gzipDecompressor.decompress(state._gzipCompressedLongInput, 
state._gzipLongDecompressed);
+    return state._gzipLongDecompressed.position();
+  }
+
   public static void main(String[] args)
       throws Exception {
     new Runner(new 
OptionsBuilder().include(BenchmarkNoDictionaryLongCompression.class.getSimpleName()).build()).run();
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
index 0cf5a3df7e..50f7687c9a 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
@@ -23,8 +23,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -50,181 +53,172 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 @Warmup(iterations = 3)
 @Measurement(iterations = 5)
 @State(Scope.Benchmark)
-// Test to get memory statistics for snappy, zstandard and lz4 string 
compression techniques
+// Test to get memory statistics for snappy, zstandard, lz4 and gzip string 
compression techniques
 public class BenchmarkNoDictionaryStringCompression {
 
   @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
   public static int _rowLength;
 
-  public static Random _random = new Random();
+  private static final int MAX_CHARS_IN_LINE = 30;
+  private static final Random RANDOM = new Random();
+  private static final ChunkCompressor LZ4_COMPRESSOR = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.LZ4);
+  private static final ChunkDecompressor LZ4_DECOMPRESSOR =
+      ChunkCompressorFactory.getDecompressor(ChunkCompressionType.LZ4);
+  private static final ChunkCompressor GZIP_COMPRESSOR =
+      ChunkCompressorFactory.getCompressor(ChunkCompressionType.GZIP);
+  private static final ChunkDecompressor GZIP_DECOMPRESSOR =
+      ChunkCompressorFactory.getDecompressor(ChunkCompressionType.GZIP);
 
   @State(Scope.Thread)
-  public static class BenchmarkNoDictionaryStringCompressionState {
-    private static ByteBuffer _uncompressedString;
-    private static ByteBuffer _snappyCompressedStringInput;
-    private static ByteBuffer _zstandardCompressedStringInput;
-    private static ByteBuffer _snappyCompressedStringOutput;
-    private static ByteBuffer _zstandardCompressedStringOutput;
-    private static ByteBuffer _snappyStringDecompressed;
-    private static ByteBuffer _zstandardStringDecompressed;
-    private static ByteBuffer _lz4CompressedStringOutput;
-    private static ByteBuffer _lz4CompressedStringInput;
-    private static ByteBuffer _lz4StringDecompressed;
-
-    private static LZ4Factory _factory;
+  public static class CompressionBuffers {
+
+    private ByteBuffer _snappyCompressedStringInput;
+    private ByteBuffer _zstandardCompressedStringInput;
+    private ByteBuffer _lz4CompressedStringInput;
+    private ByteBuffer _gzipCompressedStringInput;
+    private ByteBuffer _uncompressedString;
+    private ByteBuffer _stringDecompressed;
+    private ByteBuffer _stringCompressed;
+
+    @Setup(Level.Trial)
+    public void setUp0() {
+      // generate random block of text alongside initialising memory buffers
+      byte[][] tempRows = new byte[_rowLength][];
+      int size = 0;
+      for (int i = 0; i < _rowLength; i++) {
+        String value = 
RandomStringUtils.random(RANDOM.nextInt(MAX_CHARS_IN_LINE), true, true);
+        byte[] bytes = value.getBytes(UTF_8);
+        tempRows[i] = bytes;
+        size += bytes.length;
+      }
+      _uncompressedString = ByteBuffer.allocateDirect(size);
+      for (int i = 0; i < _rowLength; i++) {
+        _uncompressedString.put(tempRows[i]);
+      }
+      _uncompressedString.flip();
+
+      int capacity = _uncompressedString.capacity() * 2;
+      _stringDecompressed = ByteBuffer.allocateDirect(capacity);
+      _stringCompressed = ByteBuffer.allocateDirect(capacity);
+      _snappyCompressedStringInput = ByteBuffer.allocateDirect(capacity);
+      _zstandardCompressedStringInput = ByteBuffer.allocateDirect(capacity);
+      _lz4CompressedStringInput = ByteBuffer.allocateDirect(capacity);
+      _gzipCompressedStringInput = ByteBuffer.allocateDirect(capacity);
+    }
 
     @Setup(Level.Invocation)
     public void setUp()
         throws Exception {
 
-      initializeCompressors();
-      generateRandomStringBuffer();
-      allocateMemory();
-
+      _uncompressedString.rewind();
+      _snappyCompressedStringInput.clear();
+      _zstandardCompressedStringInput.clear();
+      _lz4CompressedStringInput.clear();
+      _gzipCompressedStringInput.clear();
+      _stringDecompressed.clear();
+      _stringCompressed.clear();
+
+      // prepare compressed buffers
       Snappy.compress(_uncompressedString, _snappyCompressedStringInput);
       Zstd.compress(_zstandardCompressedStringInput, _uncompressedString);
       // ZSTD compressor with change the position of _uncompressedString, a 
flip() operation over input to reset
       // position for lz4 is required
       _uncompressedString.flip();
-      _factory.fastCompressor().compress(_uncompressedString, 
_lz4CompressedStringInput);
-
-      _zstandardStringDecompressed.rewind();
       _zstandardCompressedStringInput.flip();
-      _uncompressedString.flip();
-      _snappyStringDecompressed.flip();
-      _lz4CompressedStringInput.flip();
-    }
-
-    private void initializeCompressors() {
-      //Initialize compressors and decompressors for lz4
-      _factory = LZ4Factory.fastestInstance();
-    }
-
-    private void generateRandomStringBuffer() {
-      String[] tempRows = new String[_rowLength];
-      int maxStringLengthInBytes = 0;
-      int numChars = 100;
 
-      for (int i = 0; i < _rowLength; i++) {
-        String value = RandomStringUtils.random(_random.nextInt(numChars), 
true, true);
-        maxStringLengthInBytes = Math.max(maxStringLengthInBytes, 
value.getBytes(UTF_8).length);
-        tempRows[i] = value;
-      }
-
-      _uncompressedString = ByteBuffer.allocateDirect(_rowLength * 
maxStringLengthInBytes);
-      for (int i = 0; i < _rowLength; i++) {
-        _uncompressedString.put(tempRows[i].getBytes(UTF_8));
-      }
+      LZ4_COMPRESSOR.compress(_uncompressedString, _lz4CompressedStringInput);
       _uncompressedString.flip();
-    }
 
-    private void allocateMemory() {
-      _snappyCompressedStringOutput = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _zstandardCompressedStringOutput = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _snappyStringDecompressed = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _zstandardStringDecompressed = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _snappyCompressedStringInput = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _zstandardCompressedStringInput = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _lz4StringDecompressed = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _lz4CompressedStringOutput = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
-      _lz4CompressedStringInput = 
ByteBuffer.allocateDirect(_uncompressedString.capacity() * 2);
+      GZIP_COMPRESSOR.compress(_uncompressedString, 
_gzipCompressedStringInput);
+      _uncompressedString.flip();
     }
 
     @TearDown(Level.Invocation)
     public void tearDown()
         throws Exception {
-      _snappyCompressedStringOutput.clear();
-      _snappyStringDecompressed.clear();
-      _zstandardCompressedStringOutput.clear();
-      _zstandardStringDecompressed.clear();
-      _lz4CompressedStringOutput.clear();
-      _lz4StringDecompressed.clear();
-
-      _uncompressedString.rewind();
-      _zstandardCompressedStringInput.rewind();
-      _lz4CompressedStringInput.rewind();
+      _snappyCompressedStringInput.clear();
+      _zstandardCompressedStringInput.clear();
+      _lz4CompressedStringInput.clear();
+      _gzipCompressedStringInput.clear();
+      _uncompressedString.clear();
+      _stringDecompressed.clear();
+      _stringCompressed.clear();
     }
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int 
benchmarkSnappyStringCompression(BenchmarkNoDictionaryStringCompressionState 
state)
+  public int benchmarkSnappyStringCompression(CompressionBuffers state)
       throws IOException {
-    int size = Snappy.compress(state._uncompressedString, 
state._snappyCompressedStringOutput);
+    int size = Snappy.compress(state._uncompressedString, 
state._stringCompressed);
     return size;
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int 
benchmarkSnappyStringDecompression(BenchmarkNoDictionaryStringCompressionState 
state)
+  public int benchmarkSnappyStringDecompression(CompressionBuffers state)
       throws IOException {
-    int size = Snappy.uncompress(state._snappyCompressedStringInput, 
state._snappyStringDecompressed);
+    int size = Snappy.uncompress(state._snappyCompressedStringInput, 
state._stringDecompressed);
     return size;
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int 
benchmarkZstandardStringCompression(BenchmarkNoDictionaryStringCompressionState 
state)
-      throws IOException {
-    int size = Zstd.compress(state._zstandardCompressedStringOutput, 
state._uncompressedString);
+  public int benchmarkZstandardStringCompression(CompressionBuffers state) {
+    int size = Zstd.compress(state._stringCompressed, 
state._uncompressedString);
     return size;
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int 
benchmarkZstandardStringDecompression(BenchmarkNoDictionaryStringCompressionState
 state)
-      throws IOException {
-    int size = Zstd.decompress(state._zstandardStringDecompressed, 
state._zstandardCompressedStringInput);
+  public int benchmarkZstandardStringDecompression(CompressionBuffers state) {
+    int size = Zstd.decompress(state._stringDecompressed, 
state._zstandardCompressedStringInput);
     return size;
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int benchmarkLZ4StringCompression(
-      
BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState
 state)
+  public int benchmarkLZ4HCStringCompression(CompressionBuffers state)
       throws IOException {
-    state._factory.fastCompressor().compress(state._uncompressedString, 
state._lz4CompressedStringOutput);
-    return state._lz4CompressedStringOutput.position();
+    LZ4_COMPRESSOR.compress(state._uncompressedString, 
state._stringCompressed);
+    return state._stringCompressed.position();
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int benchmarkLZ4StringDecompression(
-      
BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState
 state)
+  public int benchmarkLZ4HCStringDecompression(CompressionBuffers state)
       throws IOException {
-    
state._factory.fastDecompressor().decompress(state._lz4CompressedStringInput, 
state._lz4StringDecompressed);
-    return state._lz4StringDecompressed.position();
+    LZ4_DECOMPRESSOR.decompress(state._lz4CompressedStringInput, 
state._stringDecompressed);
+    return state._stringDecompressed.position();
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int benchmarkLZ4HCStringCompression(
-      
BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState
 state)
+  public int benchmarkGZIPStringCompression(CompressionBuffers state)
       throws IOException {
-    state._factory.highCompressor().compress(state._uncompressedString, 
state._lz4CompressedStringOutput);
-    return state._lz4CompressedStringOutput.position();
+    GZIP_COMPRESSOR.compress(state._uncompressedString, 
state._stringCompressed);
+    return state._stringCompressed.position();
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public int benchmarkLZ4HCStringDecompression(
-      
BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState
 state)
+  public int benchmarkGZIPStringDecompression(CompressionBuffers state)
       throws IOException {
-    
state._factory.fastDecompressor().decompress(state._lz4CompressedStringInput, 
state._lz4StringDecompressed);
-    return state._lz4StringDecompressed.position();
+    GZIP_DECOMPRESSOR.decompress(state._gzipCompressedStringInput, 
state._stringDecompressed);
+    return state._stringDecompressed.position();
   }
 
   public static void main(String[] args)
       throws Exception {
-    new Runner(new 
OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build())
-        .run();
+    new Runner(
+        new 
OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build()).run();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index b2d06b9718..15def2f733 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -50,8 +50,7 @@ public class ChunkCompressorFactory {
    *                                size. Most formats do this anyway, but LZ4 
requires a length prefix.
    * @return Compressor for the specified type.
    */
-  public static ChunkCompressor getCompressor(ChunkCompressionType 
compressionType,
-      boolean upgradeToLengthPrefixed) {
+  public static ChunkCompressor getCompressor(ChunkCompressionType 
compressionType, boolean upgradeToLengthPrefixed) {
     switch (compressionType) {
 
       case PASS_THROUGH:
@@ -69,6 +68,9 @@ public class ChunkCompressorFactory {
       case LZ4_LENGTH_PREFIXED:
         return LZ4WithLengthCompressor.INSTANCE;
 
+      case GZIP:
+        return new GzipCompressor();
+
       default:
         throw new IllegalArgumentException("Illegal compressor name " + 
compressionType);
     }
@@ -97,6 +99,9 @@ public class ChunkCompressorFactory {
       case LZ4_LENGTH_PREFIXED:
         return LZ4WithLengthDecompressor.INSTANCE;
 
+      case GZIP:
+        return new GzipDecompressor();
+
       default:
         throw new IllegalArgumentException("Illegal compressor name " + 
compressionType);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java
new file mode 100644
index 0000000000..3a83f7c8d2
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipCompressor.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.Deflater;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+
+/**
+ * Implementation of {@link ChunkCompressor} using GZIP compression algorithm.
+ */
+class GzipCompressor implements ChunkCompressor {
+
+  private final Deflater _compressor;
+
+  public GzipCompressor() {
+    _compressor = new Deflater();
+  }
+
+  @Override
+  public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+      throws IOException {
+    _compressor.reset();
+    _compressor.setInput(inUncompressed);
+    _compressor.finish();
+    _compressor.deflate(outCompressed);
+    outCompressed.flip();
+    return outCompressed.limit();
+  }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    // 
https://github.com/luvit/zlib/blob/8de57bce969eb9dafc1f1f5c256ac608d0a73ec4/compress.c#L75
+    return uncompressedSize + (uncompressedSize >> 12) + (uncompressedSize >> 
14) + (uncompressedSize >> 25) + 13;
+  }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.GZIP;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _compressor.end();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipDecompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipDecompressor.java
new file mode 100644
index 0000000000..b07d8acdbc
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/GzipDecompressor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+
+/**
+ * Implementation of {@link ChunkDecompressor} using GZIP decompression 
algorithm.
+ */
+class GzipDecompressor implements ChunkDecompressor {
+
+  private final Inflater _decompressor;
+
+  public GzipDecompressor() {
+    _decompressor = new Inflater();
+  }
+
+  @Override
+  public int decompress(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput)
+      throws IOException {
+    _decompressor.reset();
+    _decompressor.setInput(compressedInput);
+    try {
+      _decompressor.inflate(decompressedOutput);
+    } catch (DataFormatException e) {
+      throw new IOException(e);
+    }
+    decompressedOutput.flip();
+    return decompressedOutput.limit();
+  }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    return -1;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _decompressor.end();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
index 70d8f38706..0cdff5ce61 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
@@ -114,6 +114,7 @@ public abstract class BaseChunkForwardIndexWriter 
implements Closeable {
     _header.flip();
     _dataFile.write(_header, 0);
     _dataFile.close();
+    _chunkCompressor.close();
   }
 
   /**
@@ -192,7 +193,6 @@ public abstract class BaseChunkForwardIndexWriter 
implements Closeable {
     }
 
     _dataOffset += sizeToWrite;
-
     _chunkBuffer.clear();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index 868511a437..440808a6b0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -325,5 +325,6 @@ public class VarByteChunkForwardIndexWriterV4 implements 
VarByteChunkWriter {
     CleanerUtil.cleanQuietly(_compressionBuffer);
     CleanerUtil.cleanQuietly(_chunkBuffer);
     FileUtils.deleteQuietly(_dataBuffer);
+    _chunkCompressor.close();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index c7855ee54f..745bd18fde 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -241,12 +241,10 @@ public abstract class BaseChunkForwardIndexReader 
implements ForwardIndexReader<
 
   protected long getChunkPositionAndRecordRanges(int chunkId, List<ByteRange> 
ranges) {
     if (_headerEntryChunkOffsetSize == Integer.BYTES) {
-      ranges.add(
-          new ByteRange(_dataHeaderStart + chunkId * 
_headerEntryChunkOffsetSize, Integer.BYTES));
+      ranges.add(new ByteRange(_dataHeaderStart + chunkId * 
_headerEntryChunkOffsetSize, Integer.BYTES));
       return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize);
     } else {
-      ranges.add(
-          new ByteRange(_dataHeaderStart + chunkId * 
_headerEntryChunkOffsetSize, Long.BYTES));
+      ranges.add(new ByteRange(_dataHeaderStart + chunkId * 
_headerEntryChunkOffsetSize, Long.BYTES));
       return _dataHeader.getLong(chunkId * _headerEntryChunkOffsetSize);
     }
   }
@@ -446,9 +444,11 @@ public abstract class BaseChunkForwardIndexReader 
implements ForwardIndexReader<
   }
 
   @Override
-  public void close() {
+  public void close()
+      throws IOException {
     // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by 
the caller and might be reused later. The
     // caller is responsible of closing the PinotDataBuffer.
+    _chunkDecompressor.close();
   }
 
   private boolean isContiguousRange(int[] docIds, int length) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
index 47c30aec6b..f0a3658cb3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
@@ -266,6 +266,7 @@ public class VarByteChunkForwardIndexReaderV4
   @Override
   public void close()
       throws IOException {
+    _chunkDecompressor.close();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
index 9f711929e9..245803ec53 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
@@ -21,6 +21,10 @@ package org.apache.pinot.segment.local.io.compression;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
@@ -41,45 +45,120 @@ public class TestCompression {
     buffer.put(input);
     buffer.flip();
     return new Object[][]{
-        {ChunkCompressionType.PASS_THROUGH, buffer.slice()},
-        {ChunkCompressionType.SNAPPY, buffer.slice()},
-        {ChunkCompressionType.LZ4, buffer.slice()},
-        {ChunkCompressionType.LZ4_LENGTH_PREFIXED, buffer.slice()},
-        {ChunkCompressionType.ZSTANDARD, buffer.slice()}
+        {ChunkCompressionType.PASS_THROUGH, buffer.slice()}, 
{ChunkCompressionType.SNAPPY, buffer.slice()},
+        {ChunkCompressionType.LZ4, buffer.slice()}, 
{ChunkCompressionType.LZ4_LENGTH_PREFIXED, buffer.slice()},
+        {ChunkCompressionType.ZSTANDARD, buffer.slice()}, 
{ChunkCompressionType.GZIP, buffer.slice()}
     };
   }
 
   @Test(dataProvider = "formats")
   public void testRoundtrip(ChunkCompressionType type, ByteBuffer rawInput)
       throws IOException {
-    ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type);
-    assertEquals(compressor.compressionType(), type, "upgrade is opt in");
-    roundtrip(compressor, rawInput);
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(type)) {
+      assertEquals(compressor.compressionType(), type, "upgrade is opt in");
+      roundtrip(compressor, rawInput);
+    }
   }
 
   @Test(dataProvider = "formats")
   public void testRoundtripWithUpgrade(ChunkCompressionType type, ByteBuffer 
rawInput)
       throws IOException {
-    ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type, 
true);
-    assertNotEquals(compressor.compressionType(), ChunkCompressionType.LZ4,
-        "LZ4 compression type does not support length prefix");
-    roundtrip(compressor, rawInput);
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(type, true)) {
+      assertNotEquals(compressor.compressionType(), ChunkCompressionType.LZ4,
+          "LZ4 compression type does not support length prefix");
+      roundtrip(compressor, rawInput);
+    }
   }
 
-  private void roundtrip(ChunkCompressor compressor, ByteBuffer rawInput)
+  @Test(dataProvider = "formats")
+  public void testConcurrent(ChunkCompressionType type, ByteBuffer ignore) {
+
+    String expected = "The gzip file format is:\n"
+        + "- a 10-byte header, containing a magic number (1f 8b), the 
compression method (08 for DEFLATE), "
+        + "1-byte of header flags, a 4-byte timestamp, compression flags and 
the operating system ID.\n"
+        + "- optional extra headers as allowed by the header flags, including 
the original filename, a "
+        + "comment field, an 'extra' field, and the lower half of a CRC-32 
checksum for the header section.\n"
+        + "- a body, containing a DEFLATE-compressed payload.\n"
+        + "- an 8-byte trailer, containing a CRC-32 checksum and the length of 
the original uncompressed "
+        + "data, modulo 232.[4]\n"
+        + "gzip is normally used to compress just single files. Compressed 
archives are typically created "
+        + "by assembling collections of files into a single tar archive and 
then compressing that archive "
+        + "with gzip.\n gzip is not to be confused with ZIP, which can hold 
collections of files without "
+        + "an external archiver, but is less compact than compressed tarballs 
holding the same data, because "
+        + "it compresses files individually and cannot take advantage of 
redundancy between files.\n\n";
+    byte[] input = expected.getBytes(StandardCharsets.UTF_8);
+    ByteBuffer rawInput = 
ByteBuffer.allocateDirect(input.length).put(input).flip();
+
+    Thread[] workers = new Thread[5];
+    ByteBuffer[] compressed = new ByteBuffer[workers.length];
+    ByteBuffer[] decompressed = new ByteBuffer[workers.length];
+    CountDownLatch done = new CountDownLatch(workers.length);
+    AtomicInteger errors = new AtomicInteger();
+    for (int i = 0; i < workers.length; i++) {
+      int idx = i;
+      workers[i] = new Thread(() -> {
+        try {
+          // compress
+          try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(type)) {
+            compressed[idx] = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(rawInput.limit()));
+            compressor.compress(rawInput.slice(), compressed[idx]);
+          }
+
+          // small context switch
+          TimeUnit.MILLISECONDS.sleep(1L + (long) 
(ThreadLocalRandom.current().nextDouble() * 10.0));
+
+          // decompress
+          try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(type)) {
+            int size = decompressor.decompressedLength(compressed[idx]);
+            if (type == ChunkCompressionType.LZ4 || type == 
ChunkCompressionType.GZIP) {
+              size = rawInput.limit();
+            }
+            decompressed[idx] = ByteBuffer.allocateDirect(size);
+            decompressor.decompress(compressed[idx], decompressed[idx]);
+          }
+        } catch (Throwable e) {
+          e.printStackTrace();
+          errors.incrementAndGet();
+        } finally {
+          done.countDown();
+        }
+      });
+      workers[i].start();
+    }
+
+    try {
+      done.await(60L, TimeUnit.SECONDS); // it will not take this long
+    } catch (InterruptedException e) {
+      throw new AssertionError("timed-out");
+    }
+
+    // there are no errors
+    assertEquals(errors.get(), 0);
+
+    // all decompressed buffers contain the original text
+    for (int i = 0; i < workers.length; i++) {
+      assertEquals(StandardCharsets.UTF_8.decode(decompressed[i]).toString(), 
expected);
+      compressed[i].clear();
+      decompressed[i].clear();
+    }
+  }
+
+  private static void roundtrip(ChunkCompressor compressor, ByteBuffer 
rawInput)
       throws IOException {
     ByteBuffer compressedOutput = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(rawInput.limit()));
     compressor.compress(rawInput.slice(), compressedOutput);
-    ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(compressor.compressionType());
-    int decompressedLength = decompressor.decompressedLength(compressedOutput);
-    assertTrue(compressor.compressionType() == ChunkCompressionType.LZ4 || 
decompressedLength > 0);
-    ByteBuffer decompressedOutput = ByteBuffer.allocateDirect(
-        compressor.compressionType() == ChunkCompressionType.LZ4 ? 
rawInput.limit() : decompressedLength);
-    decompressor.decompress(compressedOutput, decompressedOutput);
-    byte[] expected = new byte[rawInput.limit()];
-    rawInput.get(expected);
-    byte[] actual = new byte[decompressedOutput.limit()];
-    decompressedOutput.get(actual);
-    assertEquals(actual, expected, "content differs after compression roundt 
rip");
+    try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(compressor.compressionType())) {
+      int decompressedLength = 
decompressor.decompressedLength(compressedOutput);
+      boolean isLz4OrGzip = compressor.compressionType() == 
ChunkCompressionType.LZ4
+          || compressor.compressionType() == ChunkCompressionType.GZIP;
+      assertTrue(isLz4OrGzip || decompressedLength > 0);
+      ByteBuffer decompressedOutput = ByteBuffer.allocateDirect(isLz4OrGzip ? 
rawInput.limit() : decompressedLength);
+      decompressor.decompress(compressedOutput, decompressedOutput);
+      byte[] expected = new byte[rawInput.limit()];
+      rawInput.get(expected);
+      byte[] actual = new byte[decompressedOutput.limit()];
+      decompressedOutput.get(actual);
+      assertEquals(actual, expected, "content differs after compression roundt 
rip");
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 17f169081b..55551d9e93 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -75,6 +75,12 @@ public class VarByteChunkSVForwardIndexTest {
     test(ChunkCompressionType.LZ4);
   }
 
+  @Test
+  public void testWithGZIPCompression()
+      throws Exception {
+    test(ChunkCompressionType.GZIP);
+  }
+
   /**
    * This test writes {@link #NUM_ENTRIES} using {@link 
VarByteChunkForwardIndexWriter}. It then reads
    * the strings & bytes using {@link VarByteChunkSVForwardIndexReader}, and 
asserts that what was written is the
@@ -177,36 +183,43 @@ public class VarByteChunkSVForwardIndexTest {
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10, 1000);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10, 1000);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 10, 1000);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 10, 1000);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100, 1000);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100, 1000);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100, 1000);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 100, 1000);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 100, 1000);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000, 1000);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000, 1000);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000, 1000);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000, 1000);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 1000, 1000);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10000, 100);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10000, 100);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10000, 100);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 10000, 100);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 10000, 100);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100000, 10);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100000, 10);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100000, 10);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 100000, 10);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 100000, 10);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000000, 10);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 1000000, 10);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 2000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 2000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 2000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.LZ4, 2000000, 10);
+    testLargeVarcharHelper(ChunkCompressionType.GZIP, 2000000, 10);
   }
 
   private void testLargeVarcharHelper(ChunkCompressionType compressionType, 
int numChars, int numDocs)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 53f6995a57..1df3e70364 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -86,21 +86,23 @@ public class ForwardIndexHandlerTest {
   private static final String DIM_PASS_THROUGH_STRING = 
"DIM_PASS_THROUGH_STRING";
   private static final String DIM_ZSTANDARD_STRING = "DIM_ZSTANDARD_STRING";
   private static final String DIM_LZ4_STRING = "DIM_LZ4_STRING";
+  private static final String DIM_GZIP_STRING = "DIM_GZIP_STRING";
 
   private static final String DIM_SNAPPY_LONG = "DIM_SNAPPY_LONG";
   private static final String DIM_PASS_THROUGH_LONG = "DIM_PASS_THROUGH_LONG";
   private static final String DIM_ZSTANDARD_LONG = "DIM_ZSTANDARD_LONG";
   private static final String DIM_LZ4_LONG = "DIM_LZ4_LONG";
-
+  private static final String DIM_GZIP_LONG = "DIM_GZIP_LONG";
   private static final String DIM_SNAPPY_INTEGER = "DIM_SNAPPY_INTEGER";
   private static final String DIM_PASS_THROUGH_INTEGER = 
"DIM_PASS_THROUGH_INTEGER";
   private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER";
   private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER";
-
+  private static final String DIM_GZIP_INTEGER = "DIM_GZIP_INTEGER";
   private static final String DIM_SNAPPY_BYTES = "DIM_SNAPPY_BYTES";
   private static final String DIM_PASS_THROUGH_BYTES = 
"DIM_PASS_THROUGH_BYTES";
   private static final String DIM_ZSTANDARD_BYTES = "DIM_ZSTANDARD_BYTES";
   private static final String DIM_LZ4_BYTES = "DIM_LZ4_BYTES";
+  private static final String DIM_GZIP_BYTES = "DIM_GZIP_BYTES";
 
   // Sorted columns
   private static final String DIM_RAW_SORTED_INTEGER = 
"DIM_RAW_SORTED_INTEGER";
@@ -110,11 +112,13 @@ public class ForwardIndexHandlerTest {
   private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
   private static final String METRIC_ZSTANDARD_INTEGER = 
"METRIC_ZSTANDARD_INTEGER";
   private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER";
+  private static final String METRIC_GZIP_INTEGER = "METRIC_GZIP_INTEGER";
 
   private static final String METRIC_SNAPPY_BIG_DECIMAL = 
"METRIC_SNAPPY_BIG_DECIMAL";
   private static final String METRIC_PASS_THROUGH_BIG_DECIMAL = 
"METRIC_PASS_THROUGH_BIG_DECIMAL";
   private static final String METRIC_ZSTANDARD_BIG_DECIMAL = 
"METRIC_ZSTANDARD_BIG_DECIMAL";
   private static final String METRIC_LZ4_BIG_DECIMAL = 
"METRIC_LZ4_BIG_DECIMAL";
+  private static final String METRIC_GZIP_BIG_DECIMAL = 
"METRIC_GZIP_BIG_DECIMAL";
 
   // Multi-value columns
   private static final String DIM_MV_PASS_THROUGH_INTEGER = 
"DIM_MV_PASS_THROUGH_INTEGER";
@@ -187,16 +191,20 @@ public class ForwardIndexHandlerTest {
       Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, 
DIM_LZ4_BYTES, METRIC_LZ4_BIG_DECIMAL,
           METRIC_LZ4_INTEGER);
 
-  private static final List<String> DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX = 
Arrays.asList(DIM_DICT_INTEGER,
-      DIM_DICT_LONG, DIM_DICT_STRING, DIM_DICT_BYES, DIM_DICT_MV_BYTES, 
DIM_DICT_MV_STRING,
-      DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG);
+  private static final List<String> RAW_GZIP_INDEX_COLUMNS =
+      Arrays.asList(DIM_GZIP_STRING, DIM_GZIP_LONG, DIM_GZIP_INTEGER, 
DIM_GZIP_BYTES, METRIC_GZIP_BIG_DECIMAL,
+          METRIC_GZIP_INTEGER);
+
+  private static final List<String> DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX =
+      Arrays.asList(DIM_DICT_INTEGER, DIM_DICT_LONG, DIM_DICT_STRING, 
DIM_DICT_BYES, DIM_DICT_MV_BYTES,
+          DIM_DICT_MV_STRING, DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG);
 
   private static final List<String> DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX 
=
       Arrays.asList(DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG, DIM_DICT_MV_STRING, 
DIM_DICT_MV_BYTES);
 
-  private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS = 
Arrays.asList(
-      DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, 
DIM_SV_FORWARD_INDEX_DISABLED_LONG, DIM_SV_FORWARD_INDEX_DISABLED_STRING,
-      DIM_SV_FORWARD_INDEX_DISABLED_BYTES);
+  private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS =
+      Arrays.asList(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, 
DIM_SV_FORWARD_INDEX_DISABLED_LONG,
+          DIM_SV_FORWARD_INDEX_DISABLED_STRING, 
DIM_SV_FORWARD_INDEX_DISABLED_BYTES);
 
   private static final List<String> MV_FORWARD_INDEX_DISABLED_COLUMNS =
       Arrays.asList(DIM_MV_FORWARD_INDEX_DISABLED_INTEGER, 
DIM_MV_FORWARD_INDEX_DISABLED_LONG,
@@ -241,13 +249,14 @@ public class ForwardIndexHandlerTest {
 
     List<FieldConfig> fieldConfigs = new ArrayList<>(
         RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_SORTED_INDEX_COLUMNS.size() + 
RAW_ZSTANDARD_INDEX_COLUMNS.size()
-            + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + 
RAW_LZ4_INDEX_COLUMNS.size()
+            + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + 
RAW_LZ4_INDEX_COLUMNS.size() + RAW_GZIP_INDEX_COLUMNS.size()
             + SV_FORWARD_INDEX_DISABLED_COLUMNS.size() + 
MV_FORWARD_INDEX_DISABLED_COLUMNS.size()
             + MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size() + 
FORWARD_INDEX_DISABLED_RAW_COLUMNS.size() + 2);
 
     for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
-      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
-          CompressionCodec.SNAPPY, null));
+      fieldConfigs.add(
+          new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, 
Collections.emptyList(), CompressionCodec.SNAPPY,
+              null));
     }
 
     for (String indexColumn : RAW_SORTED_INDEX_COLUMNS) {
@@ -266,46 +275,56 @@ public class ForwardIndexHandlerTest {
     }
 
     for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
-      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
-          CompressionCodec.LZ4, null));
+      fieldConfigs.add(
+          new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, 
Collections.emptyList(), CompressionCodec.LZ4,
+              null));
+    }
+
+    for (String indexColumn : RAW_GZIP_INDEX_COLUMNS) {
+      fieldConfigs.add(
+          new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, 
Collections.emptyList(), CompressionCodec.GZIP,
+              null));
     }
 
     for (String indexColumn : SV_FORWARD_INDEX_DISABLED_COLUMNS) {
-      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.DICTIONARY, Collections.singletonList(
-          FieldConfig.IndexType.INVERTED), null,
+      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.DICTIONARY,
+          Collections.singletonList(FieldConfig.IndexType.INVERTED), null,
           Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
     }
 
     for (String indexColumn : MV_FORWARD_INDEX_DISABLED_COLUMNS) {
-      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.DICTIONARY, Collections.singletonList(
-          FieldConfig.IndexType.INVERTED), null,
+      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.DICTIONARY,
+          Collections.singletonList(FieldConfig.IndexType.INVERTED), null,
           Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
     }
 
     for (String indexColumn : MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS) {
-      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.DICTIONARY, Collections.singletonList(
-          FieldConfig.IndexType.INVERTED), null,
+      fieldConfigs.add(new FieldConfig(indexColumn, 
FieldConfig.EncodingType.DICTIONARY,
+          Collections.singletonList(FieldConfig.IndexType.INVERTED), null,
           Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
     }
 
     for (String indexColumn : FORWARD_INDEX_DISABLED_RAW_COLUMNS) {
       fieldConfigs.add(
           new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, 
Collections.emptyList(), CompressionCodec.LZ4,
-          Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
+              Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
     }
 
-    fieldConfigs.add(new 
FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX,
-        FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
-        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
+    fieldConfigs.add(
+        new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, 
FieldConfig.EncodingType.DICTIONARY,
+            Collections.emptyList(), null,
+            Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
 
-    fieldConfigs.add(new 
FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX,
-        FieldConfig.EncodingType.DICTIONARY, 
Collections.singletonList(FieldConfig.IndexType.RANGE), null,
-        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
+    fieldConfigs.add(
+        new 
FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX, 
FieldConfig.EncodingType.DICTIONARY,
+            Collections.singletonList(FieldConfig.IndexType.RANGE), null,
+            Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
 
     _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
+    _noDictionaryColumns.addAll(RAW_GZIP_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
     _noDictionaryColumns.addAll(RAW_SORTED_INDEX_COLUMNS);
 
@@ -330,30 +349,35 @@ public class ForwardIndexHandlerTest {
         .addSingleValueDimension(DIM_PASS_THROUGH_STRING, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(DIM_ZSTANDARD_STRING, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(DIM_LZ4_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DIM_GZIP_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(DIM_SNAPPY_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_RAW_SORTED_INTEGER, 
FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_PASS_THROUGH_INTEGER, 
FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_GZIP_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_SNAPPY_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_ZSTANDARD_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_PASS_THROUGH_LONG, 
FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_LZ4_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DIM_GZIP_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_SNAPPY_BYTES, FieldSpec.DataType.BYTES)
         .addSingleValueDimension(DIM_PASS_THROUGH_BYTES, 
FieldSpec.DataType.BYTES)
         .addSingleValueDimension(DIM_ZSTANDARD_BYTES, FieldSpec.DataType.BYTES)
         .addSingleValueDimension(DIM_LZ4_BYTES, FieldSpec.DataType.BYTES)
+        .addSingleValueDimension(DIM_GZIP_BYTES, FieldSpec.DataType.BYTES)
         .addMetric(METRIC_SNAPPY_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL)
         .addMetric(METRIC_PASS_THROUGH_BIG_DECIMAL, 
FieldSpec.DataType.BIG_DECIMAL)
         .addMetric(METRIC_ZSTANDARD_BIG_DECIMAL, 
FieldSpec.DataType.BIG_DECIMAL)
         .addMetric(METRIC_LZ4_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL)
+        .addMetric(METRIC_GZIP_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL)
         .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(DIM_DICT_BYES, FieldSpec.DataType.BYTES)
         .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
-        .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT)
-        .addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_SNAPPY_INTEGER, 
FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_GZIP_INTEGER, FieldSpec.DataType.INT)
         .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
         .addMultiValueDimension(DIM_MV_PASS_THROUGH_INTEGER, 
FieldSpec.DataType.INT)
         .addMultiValueDimension(DIM_MV_PASS_THROUGH_LONG, 
FieldSpec.DataType.LONG)
@@ -480,13 +504,16 @@ public class ForwardIndexHandlerTest {
       row.putValue(DIM_ZSTANDARD_STRING, tempStringRows[i]);
       row.putValue(DIM_PASS_THROUGH_STRING, tempStringRows[i]);
       row.putValue(DIM_LZ4_STRING, tempStringRows[i]);
+      row.putValue(DIM_GZIP_STRING, tempStringRows[i]);
 
       // Raw integer columns
       row.putValue(DIM_SNAPPY_INTEGER, tempIntRows[i]);
       row.putValue(DIM_ZSTANDARD_INTEGER, tempIntRows[i]);
       row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]);
       row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]);
+      row.putValue(DIM_GZIP_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]);
+      row.putValue(METRIC_GZIP_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_PASS_THROUGH_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]);
@@ -497,18 +524,21 @@ public class ForwardIndexHandlerTest {
       row.putValue(DIM_ZSTANDARD_LONG, tempLongRows[i]);
       row.putValue(DIM_PASS_THROUGH_LONG, tempLongRows[i]);
       row.putValue(DIM_LZ4_LONG, tempLongRows[i]);
+      row.putValue(DIM_GZIP_LONG, tempLongRows[i]);
 
       // Raw Byte columns
       row.putValue(DIM_SNAPPY_BYTES, tempBytesRows[i]);
       row.putValue(DIM_ZSTANDARD_BYTES, tempBytesRows[i]);
       row.putValue(DIM_PASS_THROUGH_BYTES, tempBytesRows[i]);
       row.putValue(DIM_LZ4_BYTES, tempBytesRows[i]);
+      row.putValue(DIM_GZIP_BYTES, tempBytesRows[i]);
 
       // Raw BigDecimal column
       row.putValue(METRIC_SNAPPY_BIG_DECIMAL, tempBigDecimalRows[i]);
       row.putValue(METRIC_ZSTANDARD_BIG_DECIMAL, tempBigDecimalRows[i]);
       row.putValue(METRIC_PASS_THROUGH_BIG_DECIMAL, tempBigDecimalRows[i]);
       row.putValue(METRIC_LZ4_BIG_DECIMAL, tempBigDecimalRows[i]);
+      row.putValue(METRIC_GZIP_BIG_DECIMAL, tempBigDecimalRows[i]);
 
       // Dictionary SV columns
       row.putValue(DIM_DICT_INTEGER, tempIntRows[i]);
@@ -556,7 +586,8 @@ public class ForwardIndexHandlerTest {
   }
 
   @Test
-  public void testComputeOperationNoOp() throws Exception {
+  public void testComputeOperationNoOp()
+      throws Exception {
     // Setup
     SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
     SegmentDirectory segmentLocalFSDirectory =
@@ -574,7 +605,8 @@ public class ForwardIndexHandlerTest {
   }
 
   @Test
-  public void testComputeOperationEnableDictionary() throws Exception {
+  public void testComputeOperationEnableDictionary()
+      throws Exception {
     // Setup
     SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
     SegmentDirectory segmentLocalFSDirectory =
@@ -628,13 +660,13 @@ public class ForwardIndexHandlerTest {
     assertEquals(operationMap.get(DIM_RAW_SORTED_INTEGER),
         
Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
 
-
     // Tear down
     segmentLocalFSDirectory.close();
   }
 
   @Test
-  public void testComputeOperationDisableDictionary() throws Exception {
+  public void testComputeOperationDisableDictionary()
+      throws Exception {
     // Setup
     SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
     SegmentDirectory segmentLocalFSDirectory =
@@ -677,7 +709,8 @@ public class ForwardIndexHandlerTest {
   }
 
   @Test
-  public void testComputeOperationChangeCompression() throws Exception {
+  public void testComputeOperationChangeCompression()
+      throws Exception {
     // Setup
     SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
     SegmentDirectory segmentLocalFSDirectory =
@@ -696,9 +729,8 @@ public class ForwardIndexHandlerTest {
       randIdx = rand.nextInt(fieldConfigs.size());
       name = fieldConfigs.get(randIdx).getName();
     } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || 
MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)
-        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name)
-        || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name)
-        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
+        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) || 
FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(
+        name) || 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
         || 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config = fieldConfigs.remove(randIdx);
     CompressionCodec newCompressionType = null;
@@ -794,8 +826,8 @@ public class ForwardIndexHandlerTest {
     assertEquals(operationMap.size(), 1);
     Set<ForwardIndexHandler.Operation> operations = new 
HashSet<>(operationMap.get(DIM_LZ4_INTEGER));
     assertEquals(operations.size(), 2);
-    Set<ForwardIndexHandler.Operation> expectedOperations =
-        new 
HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
+    Set<ForwardIndexHandler.Operation> expectedOperations = new HashSet<>(
+        Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
             ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
     assertEquals(expectedOperations, operations);
 
@@ -827,7 +859,7 @@ public class ForwardIndexHandlerTest {
     operations = new HashSet<>(operationMap.get(DIM_LZ4_LONG));
     assertEquals(operations.size(), 2);
     expectedOperations = new 
HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
-            ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
+        ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
     assertEquals(expectedOperations, operations);
     operations = new HashSet<>(operationMap.get(DIM_SNAPPY_STRING));
     assertEquals(operations.size(), 2);
@@ -1108,8 +1140,7 @@ public class ForwardIndexHandlerTest {
         String columnName = config.getName();
 
         FieldConfig newConfig =
-            new FieldConfig(columnName, FieldConfig.EncodingType.RAW, 
Collections.emptyList(), compressionType,
-                null);
+            new FieldConfig(columnName, FieldConfig.EncodingType.RAW, 
Collections.emptyList(), compressionType, null);
         fieldConfigs.add(newConfig);
 
         TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1237,9 +1268,8 @@ public class ForwardIndexHandlerTest {
       randomIdx = rand.nextInt(fieldConfigs.size());
       name = fieldConfigs.get(randomIdx).getName();
     } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || 
MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)
-        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name)
-        || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name)
-        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
+        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) || 
FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(
+        name) || 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
         || 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config1 = fieldConfigs.remove(randomIdx);
     String column1 = config1.getName();
@@ -1253,9 +1283,8 @@ public class ForwardIndexHandlerTest {
       randomIdx = rand.nextInt(fieldConfigs.size());
       name = fieldConfigs.get(randomIdx).getName();
     } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || 
MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)
-        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name)
-        || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name)
-        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
+        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name) || 
FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(
+        name) || 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
         || 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config2 = fieldConfigs.remove(randomIdx);
     String column2 = config2.getName();
@@ -1369,7 +1398,8 @@ public class ForwardIndexHandlerTest {
   }
 
   @Test
-  public void testEnableDictionaryForSortedColumn() throws Exception {
+  public void testEnableDictionaryForSortedColumn()
+      throws Exception {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
_tableConfig);
 
     for (int i = 0; i < RAW_SORTED_INDEX_COLUMNS.size(); i++) {
@@ -1467,8 +1497,8 @@ public class ForwardIndexHandlerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
_tableConfig);
 
     Random rand = new Random();
-    String col1 = DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.get(
-        rand.nextInt(DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.size()));
+    String col1 =
+        
DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.get(rand.nextInt(DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX.size()));
     indexLoadingConfig.addForwardIndexDisabledColumns(col1);
     indexLoadingConfig.addInvertedIndexColumns(col1);
     String col2;
@@ -1648,8 +1678,7 @@ public class ForwardIndexHandlerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
_tableConfig);
 
     Random rand = new Random();
-    String col1 = RAW_LZ4_INDEX_COLUMNS.get(
-        rand.nextInt(RAW_LZ4_INDEX_COLUMNS.size()));
+    String col1 = 
RAW_LZ4_INDEX_COLUMNS.get(rand.nextInt(RAW_LZ4_INDEX_COLUMNS.size()));
     indexLoadingConfig.addForwardIndexDisabledColumns(col1);
     indexLoadingConfig.removeNoDictionaryColumns(col1);
     indexLoadingConfig.addInvertedIndexColumns(col1);
@@ -1678,10 +1707,10 @@ public class ForwardIndexHandlerTest {
     } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) {
       dictionaryElementSize = 4;
     }
-    validateMetadataProperties(col1, true, dictionaryElementSize, 
metadata.getCardinality(),
-        metadata.getTotalDocs(), dataType, metadata.getFieldType(), 
metadata.isSorted(),
-        metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(),
-        metadata.isAutoGenerated(), metadata.getMinValue(), 
metadata.getMaxValue(), false);
+    validateMetadataProperties(col1, true, dictionaryElementSize, 
metadata.getCardinality(), metadata.getTotalDocs(),
+        dataType, metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
+        metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+        metadata.getMinValue(), metadata.getMaxValue(), false);
 
     // Col2 validation.
     validateIndexMap(col2, true, true);
@@ -1696,10 +1725,10 @@ public class ForwardIndexHandlerTest {
     } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) {
       dictionaryElementSize = 4;
     }
-    validateMetadataProperties(col2, true, dictionaryElementSize, 
metadata.getCardinality(),
-        metadata.getTotalDocs(), dataType, metadata.getFieldType(), 
metadata.isSorted(),
-        metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(),
-        metadata.isAutoGenerated(), metadata.getMinValue(), 
metadata.getMaxValue(), false);
+    validateMetadataProperties(col2, true, dictionaryElementSize, 
metadata.getCardinality(), metadata.getTotalDocs(),
+        dataType, metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
+        metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+        metadata.getMinValue(), metadata.getMaxValue(), false);
   }
 
   @Test
@@ -1801,10 +1830,10 @@ public class ForwardIndexHandlerTest {
       // In column metadata, nothing other than hasDictionary and 
dictionaryElementSize should change.
       ColumnMetadata metadata = 
existingSegmentMetadata.getColumnMetadataFor(column);
       FieldSpec.DataType dataType = metadata.getDataType();
-      validateMetadataProperties(column, false, 0, metadata.getCardinality(),
-          metadata.getTotalDocs(), dataType, metadata.getFieldType(), 
metadata.isSorted(), metadata.isSingleValue(),
-          metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
-          metadata.getMinValue(), metadata.getMaxValue(), false);
+      validateMetadataProperties(column, false, 0, metadata.getCardinality(), 
metadata.getTotalDocs(), dataType,
+          metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
+          metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
+          metadata.getMaxValue(), false);
     }
   }
 
@@ -1923,8 +1952,8 @@ public class ForwardIndexHandlerTest {
 
     Random rand = new Random();
     // Remove from forward index list but keep the inverted index enabled
-    String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS
-        
.get(rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()));
+    String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.get(
+        rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()));
     indexLoadingConfig.removeForwardIndexDisabledColumns(column);
 
     ForwardIndexHandler fwdIndexHandler = new 
ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
@@ -2020,20 +2049,20 @@ public class ForwardIndexHandlerTest {
     validateIndexMap(col1, false, false);
     validateForwardIndex(col1, CompressionCodec.LZ4, metadata.isSorted());
     // In column metadata, nothing should change.
-    validateMetadataProperties(col1, false, 0, metadata.getCardinality(),
-        metadata.getTotalDocs(), metadata.getDataType(), 
metadata.getFieldType(), metadata.isSorted(),
-        metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(),
-        metadata.isAutoGenerated(), metadata.getMinValue(), 
metadata.getMaxValue(), false);
+    validateMetadataProperties(col1, false, 0, metadata.getCardinality(), 
metadata.getTotalDocs(),
+        metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
+        metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+        metadata.getMinValue(), metadata.getMaxValue(), false);
 
     // Col2 validation.
     metadata = existingSegmentMetadata.getColumnMetadataFor(col2);
     validateIndexMap(col2, false, false);
     validateForwardIndex(col2, CompressionCodec.LZ4, metadata.isSorted());
     // In column metadata, nothing should change.
-    validateMetadataProperties(col2, false, 0, metadata.getCardinality(),
-        metadata.getTotalDocs(), metadata.getDataType(), 
metadata.getFieldType(), metadata.isSorted(),
-        metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(),
-        metadata.isAutoGenerated(), metadata.getMinValue(), 
metadata.getMaxValue(), false);
+    validateMetadataProperties(col2, false, 0, metadata.getCardinality(), 
metadata.getTotalDocs(),
+        metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
+        metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+        metadata.getMinValue(), metadata.getMaxValue(), false);
   }
 
   @Test
@@ -2047,8 +2076,8 @@ public class ForwardIndexHandlerTest {
 
     Random rand = new Random();
     // Remove from forward index list but keep the inverted index enabled
-    String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS
-        
.get(rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()));
+    String column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.get(
+        rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()));
     indexLoadingConfig.removeForwardIndexDisabledColumns(column);
     indexLoadingConfig.removeInvertedIndexColumns(column);
     indexLoadingConfig.addNoDictionaryColumns(column);
@@ -2066,10 +2095,10 @@ public class ForwardIndexHandlerTest {
     validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
     // In column metadata, some values can change since MV columns with 
duplicates lose the duplicates on forward index
     // regeneration.
-    validateMetadataProperties(column, false, 0, metadata.getCardinality(),
-        metadata.getTotalDocs(), metadata.getDataType(), 
metadata.getFieldType(), metadata.isSorted(),
-        metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(),
-        metadata.isAutoGenerated(), metadata.getMinValue(), 
metadata.getMaxValue(), true);
+    validateMetadataProperties(column, false, 0, metadata.getCardinality(), 
metadata.getTotalDocs(),
+        metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
+        metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+        metadata.getMinValue(), metadata.getMaxValue(), true);
   }
 
   @Test
@@ -2111,11 +2140,10 @@ public class ForwardIndexHandlerTest {
       validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
 
       // In column metadata, nothing should change.
-      validateMetadataProperties(column, false, 0,
-          metadata.getCardinality(), metadata.getTotalDocs(), 
metadata.getDataType(), metadata.getFieldType(),
-          metadata.isSorted(), metadata.isSingleValue(), 
metadata.getMaxNumberOfMultiValues(),
-          metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
-          metadata.getMaxValue(), false);
+      validateMetadataProperties(column, false, 0, metadata.getCardinality(), 
metadata.getTotalDocs(),
+          metadata.getDataType(), metadata.getFieldType(), 
metadata.isSorted(), metadata.isSingleValue(),
+          metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+          metadata.getMinValue(), metadata.getMaxValue(), false);
     }
   }
 
@@ -2146,8 +2174,7 @@ public class ForwardIndexHandlerTest {
     validateIndexMap(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, 
true, true);
     
validateIndexesForForwardIndexDisabledColumns(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
 
-    ForwardIndexHandler fwdIndexHandler =
-        new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, 
_schema);
+    ForwardIndexHandler fwdIndexHandler = new 
ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
     fwdIndexHandler.updateIndices(writer);
     fwdIndexHandler.postUpdateIndicesCleanup(writer);
 
@@ -2164,8 +2191,8 @@ public class ForwardIndexHandlerTest {
     
validateMetadataProperties(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX,
 metadata.hasDictionary(),
         metadata.getColumnMaxLength(), metadata.getCardinality(), 
metadata.getTotalDocs(), metadata.getDataType(),
         metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
-        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
-        metadata.getMaxValue(), false);
+        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(), metadata.getMaxValue(),
+        false);
   }
 
   @Test
@@ -2198,8 +2225,7 @@ public class ForwardIndexHandlerTest {
     validateIndexMap(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, false, true);
     
validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
 
-    ForwardIndexHandler fwdIndexHandler =
-        new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, 
_schema);
+    ForwardIndexHandler fwdIndexHandler = new 
ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
     fwdIndexHandler.updateIndices(writer);
     fwdIndexHandler.postUpdateIndicesCleanup(writer);
 
@@ -2213,20 +2239,18 @@ public class ForwardIndexHandlerTest {
     
validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
 
     // In column metadata, nothing should change.
-    ColumnMetadata metadata =
-        
existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    ColumnMetadata metadata = 
existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
     validateMetadataProperties(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, 
metadata.hasDictionary(),
         metadata.getColumnMaxLength(), metadata.getCardinality(), 
metadata.getTotalDocs(), metadata.getDataType(),
         metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
-        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
-        metadata.getMaxValue(), false);
-    metadata =
-        
existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(), metadata.getMaxValue(),
+        false);
+    metadata = 
existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
     validateMetadataProperties(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER, 
metadata.hasDictionary(),
         metadata.getColumnMaxLength(), metadata.getCardinality(), 
metadata.getTotalDocs(), metadata.getDataType(),
         metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
-        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
-        metadata.getMaxValue(), false);
+        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(), metadata.getMaxValue(),
+        false);
   }
 
   @Test
@@ -2242,10 +2266,10 @@ public class ForwardIndexHandlerTest {
     // Add column to range index list. Must be a numerical type.
     String column;
     do {
-      column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS
-          
.get(rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()));
-    } while (!column.equals(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_STRING)
-        && !column.equals(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES));
+      column = MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.get(
+          rand.nextInt(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()));
+    } while (!column.equals(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_STRING) 
&& !column.equals(
+        DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES));
     indexLoadingConfig.addRangeIndexColumns(column);
 
     RangeIndexHandler rangeIndexHandler = new 
RangeIndexHandler(segmentLocalFSDirectory, indexLoadingConfig);
@@ -2271,10 +2295,10 @@ public class ForwardIndexHandlerTest {
     // In column metadata, some values can change since MV columns with 
duplicates lose the duplicates on forward index
     // regeneration.
     ColumnMetadata metadata = 
existingSegmentMetadata.getColumnMetadataFor(column);
-    validateMetadataProperties(column, true, 7, metadata.getCardinality(),
-        metadata.getTotalDocs(), metadata.getDataType(), 
metadata.getFieldType(), metadata.isSorted(),
-        metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(),
-        metadata.isAutoGenerated(), metadata.getMinValue(), 
metadata.getMaxValue(), true);
+    validateMetadataProperties(column, true, 7, metadata.getCardinality(), 
metadata.getTotalDocs(),
+        metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
+        metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+        metadata.getMinValue(), metadata.getMaxValue(), true);
 
     // Validate that expected metadata properties don't match. 
totalNumberOfEntries will definitely not match since
     // duplicates will be removed, but maxNumberOfMultiValues may still match 
if the row with max multi-values didn't
@@ -2332,34 +2356,24 @@ public class ForwardIndexHandlerTest {
     IndexType index1 = Mockito.mock(IndexType.class);
     Mockito.when(index1.getId()).thenReturn("index1");
     IndexConfig indexConf = new IndexConfig(true);
-    FieldIndexConfigs fieldIndexConfigs = new FieldIndexConfigs.Builder()
-        .add(index1, indexConf)
-        .build();
+    FieldIndexConfigs fieldIndexConfigs = new 
FieldIndexConfigs.Builder().add(index1, indexConf).build();
     // No need to disable dictionary
-    boolean result = DictionaryIndexType.ignoreDictionaryOverride(false, true,
-        2, fieldSpec,
-        fieldIndexConfigs, 5, 20);
+    boolean result = DictionaryIndexType.ignoreDictionaryOverride(false, true, 
2, fieldSpec, fieldIndexConfigs, 5, 20);
     Assert.assertEquals(result, true);
 
     // Set a higher noDictionarySizeRatioThreshold
-    result = DictionaryIndexType.ignoreDictionaryOverride(false, true,
-        5, fieldSpec,
-        fieldIndexConfigs, 5, 20);
+    result = DictionaryIndexType.ignoreDictionaryOverride(false, true, 5, 
fieldSpec, fieldIndexConfigs, 5, 20);
     Assert.assertEquals(result, false);
 
     // optimizeDictionary and optimizeDictionaryForMetrics both turned on
-    result = DictionaryIndexType.ignoreDictionaryOverride(true, true,
-        5, fieldSpec,
-        fieldIndexConfigs, 5, 20);
+    result = DictionaryIndexType.ignoreDictionaryOverride(true, true, 5, 
fieldSpec, fieldIndexConfigs, 5, 20);
     Assert.assertEquals(result, false);
 
     // Don't ignore for Json. We want to disable dictionary for json.
     fieldSpec = new DimensionFieldSpec();
     fieldSpec.setName("test");
     fieldSpec.setDataType(FieldSpec.DataType.JSON);
-    result = DictionaryIndexType.ignoreDictionaryOverride(true, true,
-        5, fieldSpec,
-        fieldIndexConfigs, 5, 20);
+    result = DictionaryIndexType.ignoreDictionaryOverride(true, true, 5, 
fieldSpec, fieldIndexConfigs, 5, 20);
     Assert.assertEquals(result, true);
   }
 
@@ -2558,7 +2572,8 @@ public class ForwardIndexHandlerTest {
     }
   }
 
-  private void testIndexExists(String columnName, IndexType<?, ?, ?> 
indexType) throws Exception {
+  private void testIndexExists(String columnName, IndexType<?, ?, ?> indexType)
+      throws Exception {
     SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
     SegmentDirectory segmentLocalFSDirectory =
         new SegmentLocalFSDirectory(_segmentDirectory, 
existingSegmentMetadata, ReadMode.mmap);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 97d7057d03..79c678c260 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.segment.spi.compression;
 
 public enum ChunkCompressionType {
-  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
+  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4), 
GZIP(5);
 
   private static final ChunkCompressionType[] VALUES = values();
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
index a6ab78c4ea..4ce9ce82be 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.spi.compression;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -25,7 +26,7 @@ import java.nio.ByteBuffer;
 /**
  * Interface to compress a chunk of data.
  */
-public interface ChunkCompressor {
+public interface ChunkCompressor extends Closeable {
 
   /**
    * This method compresses the given data. The output compressed ByteBuffer 
is returned ready for read.
@@ -51,4 +52,9 @@ public interface ChunkCompressor {
    * @return this compressor's type
    */
   ChunkCompressionType compressionType();
+
+  @Override
+  default void close() throws IOException {
+    // no-op
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
index 2eeb33d6c9..b3f563bb44 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.spi.compression;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -25,7 +26,7 @@ import java.nio.ByteBuffer;
 /**
  * Interface to decompress a chunk of data.
  */
-public interface ChunkDecompressor {
+public interface ChunkDecompressor extends Closeable {
 
   /**
    * This method decompresses chunk of data that was compressed using {@link
@@ -48,4 +49,9 @@ public interface ChunkDecompressor {
    */
   int decompressedLength(ByteBuffer compressedInput)
       throws IOException;
+
+  @Override
+  default void close() throws IOException {
+    // no-op
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java
index 3d192aa6a1..1351b35d96 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigs.java
@@ -57,10 +57,8 @@ public class FieldIndexConfigs {
   }
 
   public Map<String, JsonNode> unwrapIndexes() {
-    Function<Map.Entry<IndexType, IndexConfig>, JsonNode> serializer =
-        entry -> entry.getValue().toJsonNode();
-    return _configMap.entrySet().stream()
-        .filter(e -> e.getValue() != null)
+    Function<Map.Entry<IndexType, IndexConfig>, JsonNode> serializer = entry 
-> entry.getValue().toJsonNode();
+    return _configMap.entrySet().stream().filter(e -> e.getValue() != null)
         .collect(Collectors.toMap(entry -> entry.getKey().getId(), 
serializer));
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index 132705036b..70de007f8e 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -74,6 +74,10 @@ public class ForwardIndexConfig extends IndexConfig {
           _chunkCompressionType = ChunkCompressionType.LZ4;
           _dictIdCompressionType = null;
           break;
+        case GZIP:
+          _chunkCompressionType = ChunkCompressionType.GZIP;
+          _dictIdCompressionType = null;
+          break;
         case MV_ENTRY_DICT:
           _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
           _chunkCompressionType = null;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 704cb2e01c..201edeb39a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -52,8 +52,8 @@ public class FieldConfig extends BaseJsonConfig {
   public static final String TEXT_INDEX_LUCENE_USE_COMPOUND_FILE = 
"luceneUseCompoundFile";
   public static final String TEXT_INDEX_LUCENE_MAX_BUFFER_SIZE_MB = 
"luceneMaxBufferSizeMB";
   public static final String TEXT_INDEX_LUCENE_ANALYZER_CLASS = 
"luceneAnalyzerClass";
-  public static final String TEXT_INDEX_DEFAULT_LUCENE_ANALYZER_CLASS
-          = "org.apache.lucene.analysis.standard.StandardAnalyzer";
+  public static final String TEXT_INDEX_DEFAULT_LUCENE_ANALYZER_CLASS =
+      "org.apache.lucene.analysis.standard.StandardAnalyzer";
   public static final String TEXT_INDEX_STOP_WORD_SEPERATOR = ",";
   // "native" for native, default is Lucene
   public static final String TEXT_FST_TYPE = "fstType";
@@ -102,8 +102,8 @@ public class FieldConfig extends BaseJsonConfig {
     Preconditions.checkArgument(name != null, "'name' must be configured");
     _name = name;
     _encodingType = encodingType == null ? EncodingType.DICTIONARY : 
encodingType;
-    _indexTypes = indexTypes != null ? indexTypes : (
-        indexType == null ? Lists.newArrayList() : 
Lists.newArrayList(indexType));
+    _indexTypes =
+        indexTypes != null ? indexTypes : (indexType == null ? 
Lists.newArrayList() : Lists.newArrayList(indexType));
     _compressionCodec = compressionCodec;
     _timestampConfig = timestampConfig;
     _properties = properties;
@@ -129,6 +129,7 @@ public class FieldConfig extends BaseJsonConfig {
     // CLP is a special type of compression codec that isn't generally 
applicable to all RAW columns and has a
     // special handling for log lines (see {@link CLPForwardIndexCreatorV1})
     CLP(false, false),
+    GZIP(true, false),
 
     // For MV dictionary encoded forward index, add a second level dictionary 
encoding for the multi-value entries
     MV_ENTRY_DICT(false, true);
@@ -258,8 +259,8 @@ public class FieldConfig extends BaseJsonConfig {
     }
 
     public FieldConfig build() {
-      return new FieldConfig(_name, _encodingType, null, _indexTypes, 
_compressionCodec, _timestampConfig,
-          _indexes, _properties, _tierOverwrites);
+      return new FieldConfig(_name, _encodingType, null, _indexTypes, 
_compressionCodec, _timestampConfig, _indexes,
+          _properties, _tierOverwrites);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to