This is an automated email from the ASF dual-hosted git repository.
lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e645a50210 Improve the unit test dataset used by
CLPMutableForwardIndexV2Test and CLPForwardIndexCreatorV2Test (#14632)
e645a50210 is described below
commit e645a50210bf7b951177f9e553eb9ffabdf524ca
Author: Jack Luo <[email protected]>
AuthorDate: Sat Dec 14 04:48:22 2024 +0800
Improve the unit test dataset used by CLPMutableForwardIndexV2Test and
CLPForwardIndexCreatorV2Test (#14632)
* Improve the unit test dataset used by CLPMutableForwardIndexV2Test and
CLPForwardIndexCreatorV2Test
* Add compressed log data.
* Fix linting issue.
* Improved unit test code quality.
---
.../creator/CLPForwardIndexCreatorV2Test.java | 127 ++++++++++++++++-----
.../mutable/CLPMutableForwardIndexV2Test.java | 43 +++----
.../src/test/resources/data/log.jsonl.gz | Bin 0 -> 6148486 bytes
3 files changed, 124 insertions(+), 46 deletions(-)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
index c66ea2f3ae..32732e4cad 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
@@ -18,75 +18,150 @@
*/
package org.apache.pinot.segment.local.segment.index.creator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import
org.apache.pinot.segment.local.segment.index.forward.mutable.VarByteSVMutableForwardIndexTest;
import
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class CLPForwardIndexCreatorV2Test {
+ private static final String COLUMN_NAME = "column1";
private static final File TEMP_DIR =
new File(FileUtils.getTempDirectory(),
CLPForwardIndexCreatorV2Test.class.getSimpleName());
private PinotDataBufferMemoryManager _memoryManager;
+ private List<String> _logMessages = new ArrayList<>();
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
_memoryManager = new
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try (GzipCompressorInputStream gzipInputStream = new
GzipCompressorInputStream(
+ getClass().getClassLoader().getResourceAsStream("data/log.jsonl.gz"));
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(gzipInputStream))) {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ JsonNode jsonNode = objectMapper.readTree(line);
+ _logMessages.add(jsonNode.get("message").asText());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
}
@Test
public void testCLPWriter()
throws IOException {
- List<String> logLines = new ArrayList<>();
- logLines.add("INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
- + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property
LiveInstance took 5 ms. Selective: true");
- logLines.add("INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
- + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property
LiveInstance took 4 ms. Selective: true");
- logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2]
Handled request from 0.0"
- + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness,
content-type null status code 200 OK");
- logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6]
Handled request from 0.0"
- + ".0.0 GET
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables,
content-type "
- + "application/json status code 200 OK");
- logLines.add("null");
-
// Create and ingest into a clp mutable forward indexes
- CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new
CLPMutableForwardIndexV2("column1", _memoryManager);
- for (int i = 0; i < logLines.size(); i++) {
- clpMutableForwardIndexV2.setString(i, logLines.get(i));
+ CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new
CLPMutableForwardIndexV2(COLUMN_NAME, _memoryManager);
+ int rawSizeBytes = 0;
+ int maxLength = 0;
+ for (int i = 0; i < _logMessages.size(); i++) {
+ String logMessage = _logMessages.get(i);
+ clpMutableForwardIndexV2.setString(i, logMessage);
+ rawSizeBytes += logMessage.length();
+ maxLength = Math.max(maxLength, logMessage.length());
}
- // Create a immutable forward index from mutable forward index
- CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
- new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2,
ChunkCompressionType.ZSTANDARD);
- for (int i = 0; i < logLines.size(); i++) {
-
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
+ // LZ4 compression type
+ long rawStringFwdIndexSizeLZ4 =
createStringRawForwardIndex(ChunkCompressionType.LZ4, maxLength);
+ long clpFwdIndexSizeLZ4 =
+ createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2,
ChunkCompressionType.LZ4);
+ // For LZ4 compression:
+ // 1. CLP raw forward index should achieve at least 40x compression
+ // 2. at least 25% smaller file size compared to standard raw forward
index with LZ4 compression
+ Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeLZ4 >= 40);
+ Assert.assertTrue((float) rawStringFwdIndexSizeLZ4 / clpFwdIndexSizeLZ4 >=
0.25);
+
+ // ZSTD compression type
+ long rawStringFwdIndexSizeZSTD =
createStringRawForwardIndex(ChunkCompressionType.ZSTANDARD, maxLength);
+ long clpFwdIndexSizeZSTD =
+ createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2,
ChunkCompressionType.ZSTANDARD);
+ // For ZSTD compression
+ // 1. CLP raw forward index should achieve at least 66x compression
+ // 2. at least 19% smaller file size compared to standard raw forward
index with ZSTD compression
+ Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeZSTD >= 66);
+ Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD
>= 0.19);
+ }
+
+ private long createStringRawForwardIndex(ChunkCompressionType
compressionType, int maxLength)
+ throws IOException {
+ // Create a raw string immutable forward index
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+ SingleValueVarByteRawIndexCreator index =
+ new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType,
COLUMN_NAME, _logMessages.size(),
+ FieldSpec.DataType.STRING, maxLength);
+ for (String logMessage : _logMessages) {
+ index.putString(logMessage);
}
- clpForwardIndexCreatorV2.seal();
- clpForwardIndexCreatorV2.close();
+ index.seal();
+ index.close();
+
+ File indexFile = new File(TEMP_DIR, COLUMN_NAME +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ return indexFile.length();
+ }
+
+ private long
createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2
clpMutableForwardIndexV2,
+ ChunkCompressionType compressionType)
+ throws IOException {
+ long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2,
compressionType);
// Read from immutable forward index and validate the content
- File indexFile = new File(TEMP_DIR, "column1" +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ File indexFile = new File(TEMP_DIR, COLUMN_NAME +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
PinotDataBuffer pinotDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
- CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new
CLPForwardIndexReaderV2(pinotDataBuffer, logLines.size());
+ CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new
CLPForwardIndexReaderV2(pinotDataBuffer, _logMessages.size());
CLPForwardIndexReaderV2.CLPReaderContext clpForwardIndexReaderV2Context =
clpForwardIndexReaderV2.createContext();
- for (int i = 0; i < logLines.size(); i++) {
- Assert.assertEquals(clpForwardIndexReaderV2.getString(i,
clpForwardIndexReaderV2Context), logLines.get(i));
+ for (int i = 0; i < _logMessages.size(); i++) {
+ Assert.assertEquals(clpForwardIndexReaderV2.getString(i,
clpForwardIndexReaderV2Context), _logMessages.get(i));
}
+
+ return indexSize;
+ }
+
+ private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2
clpMutableForwardIndexV2,
+ ChunkCompressionType compressionType)
+ throws IOException {
+ // Create a CLP immutable forward index from mutable forward index
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+ CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
+ new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2,
compressionType);
+ for (int i = 0; i < _logMessages.size(); i++) {
+
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
+ }
+ clpForwardIndexCreatorV2.seal();
+ clpForwardIndexCreatorV2.close();
+
+ File indexFile = new File(TEMP_DIR, COLUMN_NAME +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ return indexFile.length();
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
index b1824570dc..6179a4b8bf 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
@@ -18,10 +18,15 @@
*/
package org.apache.pinot.segment.local.segment.index.forward.mutable;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
@@ -33,10 +38,24 @@ import org.testng.annotations.Test;
public class CLPMutableForwardIndexV2Test {
private PinotDataBufferMemoryManager _memoryManager;
+ private List<String> _logMessages = new ArrayList<>();
@BeforeClass
public void setUp() {
_memoryManager = new
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try (GzipCompressorInputStream gzipInputStream = new
GzipCompressorInputStream(
+ getClass().getClassLoader().getResourceAsStream("data/log.jsonl.gz"));
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(gzipInputStream))) {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ JsonNode jsonNode = objectMapper.readTree(line);
+ _logMessages.add(jsonNode.get("message").asText());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@AfterClass
@@ -52,33 +71,17 @@ public class CLPMutableForwardIndexV2Test {
public void testReadWriteOnLogMessages()
throws IOException {
try (CLPMutableForwardIndexV2 readerWriter = new
CLPMutableForwardIndexV2("col1", _memoryManager)) {
- List<String> logLines = new ArrayList<>();
- for (int i = 0; i < 10000; i++) {
- logLines.add("INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
- + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property
LiveInstance took 5 ms. Selective:"
- + " true");
- logLines.add("INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
- + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property
LiveInstance took 4 ms. Selective:"
- + " true");
- logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2]
Handled request from 0.0"
- + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness,
content-type null status code 200 OK");
- logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6]
Handled request from 0.0"
- + ".0.0 GET
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables,
content-type "
- + "application/json status code 200 OK");
- logLines.add("null");
- }
-
// Typically, log messages should be clp encoded due to low logtype and
dictionary variable cardinality
Assert.assertTrue(readerWriter.isClpEncoded());
// Write
- for (int i = 0; i < logLines.size(); i++) {
- readerWriter.setString(i, logLines.get(i));
+ for (int i = 0; i < _logMessages.size(); i++) {
+ readerWriter.setString(i, _logMessages.get(i));
}
// Read
- for (int i = 0; i < logLines.size(); i++) {
- Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
+ for (int i = 0; i < _logMessages.size(); i++) {
+ Assert.assertEquals(readerWriter.getString(i), _logMessages.get(i));
}
}
}
diff --git a/pinot-segment-local/src/test/resources/data/log.jsonl.gz
b/pinot-segment-local/src/test/resources/data/log.jsonl.gz
new file mode 100644
index 0000000000..01f483fb08
Binary files /dev/null and
b/pinot-segment-local/src/test/resources/data/log.jsonl.gz differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]