This is an automated email from the ASF dual-hosted git repository.
lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1f8fd632c0 [Feature] Enable the capability to specify zstd and lz4
segment compression via config (#14008)
1f8fd632c0 is described below
commit 1f8fd632c09d6835f42d71675277694234938934
Author: Jack Luo <[email protected]>
AuthorDate: Tue Oct 22 04:13:32 2024 +0800
[Feature] Enable the capability to specify zstd and lz4 segment compression
via config (#14008)
* Enable the capability to specify zstd and lz4 segment compression codec
via config
* Reduce the scope of the change to server-only
* Add a blank line to trigger unit test again
* Addressed code review comments.
---
.../pinot/common/utils/TarCompressionUtils.java | 41 ++++++++++++++++++----
.../core/data/manager/BaseTableDataManager.java | 4 +--
.../realtime/RealtimeSegmentDataManager.java | 5 +--
.../data/manager/BaseTableDataManagerTest.java | 10 +++---
.../server/starter/helix/BaseServerStarter.java | 7 ++++
.../apache/pinot/spi/utils/CommonConstants.java | 2 ++
6 files changed, 54 insertions(+), 15 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
index 089c0fae36..3a6f3170f0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
@@ -69,6 +69,12 @@ public class TarCompressionUtils {
private TarCompressionUtils() {
}
+ /**
+ * This generic compressed tar file extension does not bind to a particular
compressor. Decompression determines the
+ * appropriate compressor at run-time based on the file's magic number
irrespective of the file extension.
+ * Compression uses the default compressor automatically if this generic
extension is used.
+ */
+ public static final String TAR_COMPRESSED_FILE_EXTENSION = ".tar.compressed";
public static final String TAR_GZ_FILE_EXTENSION = ".tar.gz";
public static final String TAR_LZ4_FILE_EXTENSION = ".tar.lz4";
public static final String TAR_ZST_FILE_EXTENSION = ".tar.zst";
@@ -77,6 +83,13 @@ public class TarCompressionUtils {
CompressorStreamFactory.LZ4_FRAMED, TAR_ZST_FILE_EXTENSION,
CompressorStreamFactory.ZSTANDARD);
private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY =
CompressorStreamFactory.getSingleton();
private static final char ENTRY_NAME_SEPARATOR = '/';
+ private static String _defaultCompressorName = CompressorStreamFactory.GZIP;
+
+ public static void setDefaultCompressor(String compressorName) {
+ if (COMPRESSOR_NAME_BY_FILE_EXTENSIONS.containsKey(compressorName)) {
+ _defaultCompressorName = compressorName;
+ }
+ }
/**
* Creates a compressed tar file from the input file/directory to the output
file. The output file must have
@@ -93,15 +106,29 @@ public class TarCompressionUtils {
*/
public static void createCompressedTarFile(File[] inputFiles, File
outputFile)
throws IOException {
- String compressorName = null;
- for (String supportedCompressorExtension :
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
- if (outputFile.getName().endsWith(supportedCompressorExtension)) {
- compressorName =
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
- break;
+ if (outputFile.getName().endsWith(TAR_COMPRESSED_FILE_EXTENSION)) {
+ createCompressedTarFile(inputFiles, outputFile, _defaultCompressorName);
+ } else {
+ String compressorName = null;
+ for (String supportedCompressorExtension :
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ if (outputFile.getName().endsWith(supportedCompressorExtension)) {
+ compressorName =
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
+ createCompressedTarFile(inputFiles, outputFile, compressorName);
+ return;
+ }
}
+ Preconditions.checkState(null != compressorName,
+ "Output file: %s does not have a supported compressed tar file
extension", outputFile);
}
- Preconditions.checkState(null != compressorName,
- "Output file: %s does not have a supported compressed tar file
extension", outputFile);
+ }
+
+ public static void createCompressedTarFile(File inputFile, File outputFile,
String compressorName)
+ throws IOException {
+ createCompressedTarFile(new File[]{inputFile}, outputFile, compressorName);
+ }
+
+ public static void createCompressedTarFile(File[] inputFiles, File
outputFile, String compressorName)
+ throws IOException {
try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath());
BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut);
OutputStream compressorOut =
COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 56d2cb35d6..b1d3647a54 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -793,7 +793,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
failedAttempts.get());
}
} else {
- File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl,
segmentTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from: {} to: {}, file
length: {}", segmentName, downloadUrl,
segmentTarFile, segmentTarFile.length());
@@ -820,7 +820,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_tableNameWithType);
_logger.info("Downloading segment: {} from peers", segmentName);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
try {
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName,
_peerDownloadScheme, () -> {
List<URI> peerServerURIs =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index bed8f2a310..939e43d393 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1016,7 +1016,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS,
1L);
final long lockAcquireTimeMillis = now();
- // Build a segment from in-memory rows.If buildTgz is true, then build
the tar.gz file as well
+ // Build a segment from in-memory rows.
+ // If build compressed archive is true, then build the tar.compressed
file as well
// TODO Use an auto-closeable object to delete temp resources.
File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" +
_segmentNameStr + "-" + now());
@@ -1069,7 +1070,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
if (forCommit) {
- File segmentTarFile = new File(dataDir, _segmentNameStr +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(dataDir, _segmentNameStr +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
try {
TarCompressionUtils.createCompressedTarFile(indexDir,
segmentTarFile);
} catch (IOException e) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index e40ef49131..7d351c486f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -293,7 +293,8 @@ public class BaseTableDataManagerTest {
throws Exception {
File indexDir = createSegment(SegmentVersion.v3, 5);
SegmentZKMetadata zkMetadata =
- makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
+ makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION),
+ false);
// Same CRC but force to download.
BaseTableDataManager tableDataManager = createTableManager();
@@ -567,7 +568,7 @@ public class BaseTableDataManagerTest {
File tempDir = new File(TEMP_DIR, "testDownloadAndDecrypt");
String fileName = "tmp.txt";
FileUtils.write(new File(tempDir, fileName), "this is from somewhere
remote");
- String tarFileName = SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION;
+ String tarFileName = SEGMENT_NAME +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION;
File tempTarFile = new File(TEMP_DIR, tarFileName);
TarCompressionUtils.createCompressedTarFile(tempDir, tempTarFile);
@@ -607,7 +608,7 @@ public class BaseTableDataManagerTest {
File tempRootDir =
tableDataManager.getTmpSegmentDataDir("test-untar-move");
// All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempTar = new File(tempRootDir, SEGMENT_NAME +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
File tempInputDir = new File(tempRootDir, "input");
FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment
dir");
TarCompressionUtils.createCompressedTarFile(tempInputDir, tempTar);
@@ -687,7 +688,8 @@ public class BaseTableDataManagerTest {
private static SegmentZKMetadata createRawSegment(SegmentVersion
segmentVersion, int numRows)
throws Exception {
File indexDir = createSegment(segmentVersion, numRows);
- return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
+ return makeRawSegment(indexDir,
+ new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
}
private static SegmentZKMetadata makeRawSegment(File indexDir, File
rawSegmentFile, boolean deleteIndexDir)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index dc4100eebc..98f700c277 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -59,6 +59,7 @@ import org.apache.pinot.common.utils.PinotAppConfigs;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -161,6 +162,12 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
+ String tarCompressionCodecName =
+
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME);
+ if (null != tarCompressionCodecName) {
+ TarCompressionUtils.setDefaultCompressor(tarCompressionCodecName);
+ }
+
setupHelixSystemProperties();
_listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf);
_hostname = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ff81f6bc4e..f62efb2062 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -65,6 +65,8 @@ public class CommonConstants {
public static final String CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS =
"pinot.executors.fixed.default.numThreads";
public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1";
+ public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME =
"pinot.tar.compression.codec.name";
+
/**
* The state of the consumer for a given segment
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]