This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new df366cc1cc [Feature] Add the ability to compress and decompress tar
archive file with ZStandard and LZ4 compressors (#13782)
df366cc1cc is described below
commit df366cc1cc0f3e6c8a6aad968538c7dd445e3ee6
Author: Jack Luo <[email protected]>
AuthorDate: Thu Aug 15 06:44:40 2024 +0800
[Feature] Add the ability to compress and decompress tar archive file with
ZStandard and LZ4 compressors (#13782)
---
...pressionUtils.java => TarCompressionUtils.java} | 111 ++++++++++-------
.../apache/pinot/common/utils/http/HttpClient.java | 4 +-
...UtilsTest.java => TarCompressionUtilsTest.java} | 134 ++++++++++++++-------
.../java/org/apache/pinot/compat/SegmentOp.java | 6 +-
.../connector/flink/sink/FlinkSegmentWriter.java | 4 +-
.../resources/LLCSegmentCompletionHandlers.java | 6 +-
.../pinot/controller/util/FileIngestionHelper.java | 4 +-
.../controller/api/upload/ZKOperatorTest.java | 8 +-
.../core/data/manager/BaseTableDataManager.java | 8 +-
.../realtime/RealtimeSegmentDataManager.java | 6 +-
.../core/metadata/DefaultMetadataExtractor.java | 4 +-
.../data/manager/BaseTableDataManagerTest.java | 17 ++-
.../impl/fakestream/FakeStreamConfigUtils.java | 4 +-
.../tests/BaseClusterIntegrationTest.java | 4 +-
.../tests/ClusterIntegrationTestUtils.java | 6 +-
.../MergeRollupMinionClusterIntegrationTest.java | 6 +-
.../tests/startree/SegmentInfoProvider.java | 4 +-
.../pinot/perf/BenchmarkOfflineIndexReader.java | 4 +-
.../batch/common/SegmentGenerationJobUtils.java | 4 +-
.../batch/hadoop/HadoopSegmentCreationMapper.java | 6 +-
.../hadoop/HadoopSegmentGenerationJobRunner.java | 4 +-
.../spark/SparkSegmentGenerationJobRunner.java | 8 +-
.../spark3/SparkSegmentGenerationJobRunner.java | 8 +-
.../standalone/SegmentGenerationJobRunner.java | 4 +-
.../BaseMultipleSegmentsConversionExecutor.java | 8 +-
.../tasks/BaseSingleSegmentConversionExecutor.java | 9 +-
.../SegmentGenerationAndPushTaskExecutor.java | 4 +-
.../filebased/FileBasedSegmentWriter.java | 4 +-
.../filebased/FileBasedSegmentWriterTest.java | 18 +--
.../segment/local/utils/SegmentPushUtils.java | 14 +--
.../pinot/server/api/resources/TablesResource.java | 12 +-
.../helix/HelixInstanceDataManagerConfig.java | 4 +-
.../pinot/server/api/TablesResourceTest.java | 6 +-
.../command/SegmentProcessorFrameworkCommand.java | 4 +-
.../tools/admin/command/UploadSegmentCommand.java | 6 +-
.../converter/DictionaryToRawIndexConverter.java | 8 +-
.../converter/PinotSegmentConvertCommand.java | 4 +-
37 files changed, 271 insertions(+), 204 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
similarity index 65%
rename from
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
index c094b74a49..089c0fae36 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
@@ -31,22 +31,23 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
/**
- * Utility class to compress/de-compress tar.gz files.
+ * Utility class to compress/de-compress tar files compressed using various
compressors.
*/
-public class TarGzCompressionUtils {
+public class TarCompressionUtils {
public static final long NO_DISK_WRITE_RATE_LIMIT = -1;
/* Don't limit write rate to disk. The OS will buffer multiple writes and
can write up to several GBs
* at a time, which saturates disk bandwidth.
@@ -65,71 +66,85 @@ public class TarGzCompressionUtils {
* It is also sufficient for HDDs
*/
-
- private TarGzCompressionUtils() {
+ private TarCompressionUtils() {
}
public static final String TAR_GZ_FILE_EXTENSION = ".tar.gz";
+ public static final String TAR_LZ4_FILE_EXTENSION = ".tar.lz4";
+ public static final String TAR_ZST_FILE_EXTENSION = ".tar.zst";
+ public static final Map<String, String> COMPRESSOR_NAME_BY_FILE_EXTENSIONS =
+ Map.of(TAR_GZ_FILE_EXTENSION, CompressorStreamFactory.GZIP,
TAR_LZ4_FILE_EXTENSION,
+ CompressorStreamFactory.LZ4_FRAMED, TAR_ZST_FILE_EXTENSION,
CompressorStreamFactory.ZSTANDARD);
+ private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY =
CompressorStreamFactory.getSingleton();
private static final char ENTRY_NAME_SEPARATOR = '/';
/**
- * Creates a tar.gz file from the input file/directory to the output file.
The output file must have ".tar.gz" as the
- * file extension.
+ * Creates a compressed tar file from the input file/directory to the output
file. The output file must have
+ * a supported compressed tar file extension as the file extension such as
".tar.gz" or ".tar.zst"
*/
- public static void createTarGzFile(File inputFile, File outputFile)
+ public static void createCompressedTarFile(File inputFile, File outputFile)
throws IOException {
- createTarGzFile(new File[]{inputFile}, outputFile);
+ createCompressedTarFile(new File[]{inputFile}, outputFile);
}
/**
- * Creates a tar.gz file from a list of input file/directories to the output
file. The output file must have
- * ".tar.gz" as the file extension.
+ * Creates a compressed tar file from a list of input file/directories to
the output file. The output file must have
+ * a supported file extension such as "tar.gz" or "tar.zst"
*/
- public static void createTarGzFile(File[] inputFiles, File outputFile)
+ public static void createCompressedTarFile(File[] inputFiles, File
outputFile)
throws IOException {
-
Preconditions.checkArgument(outputFile.getName().endsWith(TAR_GZ_FILE_EXTENSION),
- "Output file: %s does not have '.tar.gz' file extension", outputFile);
+ String compressorName = null;
+ for (String supportedCompressorExtension :
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ if (outputFile.getName().endsWith(supportedCompressorExtension)) {
+ compressorName =
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
+ break;
+ }
+ }
+ Preconditions.checkState(null != compressorName,
+ "Output file: %s does not have a supported compressed tar file
extension", outputFile);
try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath());
BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut);
- OutputStream gzipOut = new GzipCompressorOutputStream(bufferedOut);
- TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut))
{
- tarGzOut.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR);
- tarGzOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+ OutputStream compressorOut =
COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName,
+ bufferedOut); TarArchiveOutputStream tarOut = new
TarArchiveOutputStream(compressorOut)) {
+ tarOut.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR);
+ tarOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
for (File inputFile : inputFiles) {
- addFileToTarGz(tarGzOut, inputFile, "");
+ addFileToCompressedTar(tarOut, inputFile, "");
}
+ } catch (CompressorException e) {
+ throw new IOException(e);
}
}
/**
- * Helper method to write a file into the tar.gz file output stream. The
base entry name is the relative path of the
- * file to the root directory.
+ * Helper method to write a file into the compressed tar file output stream.
The base entry name is the relative
+ * path of the file to the root directory.
*/
- private static void addFileToTarGz(ArchiveOutputStream tarGzOut, File file,
String baseEntryName)
+ private static void addFileToCompressedTar(ArchiveOutputStream tarOut, File
file, String baseEntryName)
throws IOException {
String entryName = baseEntryName + file.getName();
TarArchiveEntry entry = new TarArchiveEntry(file, entryName);
- tarGzOut.putArchiveEntry(entry);
+ tarOut.putArchiveEntry(entry);
if (file.isFile()) {
try (InputStream in = Files.newInputStream(file.toPath())) {
- IOUtils.copy(in, tarGzOut);
+ IOUtils.copy(in, tarOut);
}
- tarGzOut.closeArchiveEntry();
+ tarOut.closeArchiveEntry();
} else {
- tarGzOut.closeArchiveEntry();
+ tarOut.closeArchiveEntry();
File[] children = file.listFiles();
assert children != null;
String baseEntryNameForChildren = entryName + ENTRY_NAME_SEPARATOR;
for (File child : children) {
- addFileToTarGz(tarGzOut, child, baseEntryNameForChildren);
+ addFileToCompressedTar(tarOut, child, baseEntryNameForChildren);
}
}
}
/**
- * Un-tars a tar.gz file into a directory, returns all the untarred
files/directories.
+ * Un-tars a compressed tar file into a directory, returns all the untarred
files/directories.
* <p>For security reason, the untarred files must reside in the output
directory.
*/
public static List<File> untar(File inputFile, File outputDir)
@@ -140,7 +155,7 @@ public class TarGzCompressionUtils {
}
/**
- * Un-tars an inputstream of a tar.gz file into a directory, returns all the
untarred files/directories.
+ * Un-tars an inputstream of a compressed tar file into a directory, returns
all the untarred files/directories.
* <p>For security reason, the untarred files must reside in the output
directory.
*/
public static List<File> untar(InputStream inputStream, File outputDir)
@@ -149,7 +164,7 @@ public class TarGzCompressionUtils {
}
/**
- * Un-tars an inputstream of a tar.gz file into a directory, returns all the
untarred files/directories.
+ * Un-tars an inputstream of a compressed tar file into a directory, returns
all the untarred files/directories.
* RateLimit limits the untar rate
* <p>For security reason, the untarred files must reside in the output
directory.
*/
@@ -162,10 +177,10 @@ public class TarGzCompressionUtils {
}
List<File> untarredFiles = new ArrayList<>();
try (InputStream bufferedIn = new BufferedInputStream(inputStream);
- InputStream gzipIn = new GzipCompressorInputStream(bufferedIn);
- ArchiveInputStream tarGzIn = new TarArchiveInputStream(gzipIn)) {
+ InputStream compressorIn =
COMPRESSOR_STREAM_FACTORY.createCompressorInputStream(bufferedIn);
+ ArchiveInputStream tarIn = new TarArchiveInputStream(compressorIn)) {
ArchiveEntry entry;
- while ((entry = tarGzIn.getNextEntry()) != null) {
+ while ((entry = tarIn.getNextEntry()) != null) {
String entryName = entry.getName();
String[] parts = StringUtils.split(entryName, ENTRY_NAME_SEPARATOR);
File outputFile = outputDir;
@@ -174,8 +189,9 @@ public class TarGzCompressionUtils {
}
if (entry.isDirectory()) {
if
(!outputFile.getCanonicalPath().startsWith(outputDirCanonicalPath)) {
- throw new IOException(String
- .format("Trying to create directory: %s outside of the output
directory: %s", outputFile, outputDir));
+ throw new IOException(
+ String.format("Trying to create directory: %s outside of the
output directory: %s", outputFile,
+ outputDir));
}
if (!outputFile.isDirectory() && !outputFile.mkdirs()) {
throw new IOException(String.format("Failed to create directory:
%s", outputFile));
@@ -189,49 +205,54 @@ public class TarGzCompressionUtils {
parentFileCanonicalPath += File.separator;
}
if (!parentFileCanonicalPath.startsWith(outputDirCanonicalPath)) {
- throw new IOException(String
- .format("Trying to create directory: %s outside of the output
directory: %s", parentFile, outputDir));
+ throw new IOException(
+ String.format("Trying to create directory: %s outside of the
output directory: %s", parentFile,
+ outputDir));
}
if (!parentFile.isDirectory() && !parentFile.mkdirs()) {
throw new IOException(String.format("Failed to create directory:
%s", parentFile));
}
try (FileOutputStream out = new
FileOutputStream(outputFile.toPath().toString())) {
if (maxStreamRateInByte != NO_DISK_WRITE_RATE_LIMIT) {
- copyWithRateLimiter(tarGzIn, out, maxStreamRateInByte);
+ copyWithRateLimiter(tarIn, out, maxStreamRateInByte);
} else {
- IOUtils.copy(tarGzIn, out);
+ IOUtils.copy(tarIn, out);
}
}
}
untarredFiles.add(outputFile);
}
+ } catch (CompressorException e) {
+ throw new IOException(e);
}
return untarredFiles;
}
/**
- * Un-tars one single file with the given file name from a tar.gz file.
+ * Un-tars one single file with the given file name from a compressed tar
file.
*/
public static void untarOneFile(File inputFile, String fileName, File
outputFile)
throws IOException {
try (InputStream fileIn = Files.newInputStream(inputFile.toPath());
InputStream bufferedIn = new BufferedInputStream(fileIn);
- InputStream gzipIn = new GzipCompressorInputStream(bufferedIn);
- ArchiveInputStream tarGzIn = new TarArchiveInputStream(gzipIn)) {
+ InputStream compressorIn =
COMPRESSOR_STREAM_FACTORY.createCompressorInputStream(bufferedIn);
+ ArchiveInputStream tarIn = new TarArchiveInputStream(compressorIn)) {
ArchiveEntry entry;
- while ((entry = tarGzIn.getNextEntry()) != null) {
+ while ((entry = tarIn.getNextEntry()) != null) {
if (!entry.isDirectory()) {
String entryName = entry.getName();
String[] parts = StringUtils.split(entryName, ENTRY_NAME_SEPARATOR);
if (parts.length > 0 && parts[parts.length - 1].equals(fileName)) {
try (OutputStream out =
Files.newOutputStream(outputFile.toPath())) {
- IOUtils.copy(tarGzIn, out);
+ IOUtils.copy(tarIn, out);
}
return;
}
}
}
throw new IOException(String.format("Failed to find file: %s in: %s",
fileName, inputFile));
+ } catch (CompressorException e) {
+ throw new IOException(e);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
index a58515582d..23c8d4ac1d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
@@ -64,7 +64,7 @@ import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.SimpleHttpErrorInfo;
import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -457,7 +457,7 @@ public class HttpClient implements AutoCloseable {
}
try (InputStream inputStream = response.getEntity().getContent()) {
- ret = TarGzCompressionUtils.untarWithRateLimiter(inputStream, dest,
maxStreamRateInByte).get(0);
+ ret = TarCompressionUtils.untarWithRateLimiter(inputStream, dest,
maxStreamRateInByte).get(0);
}
LOGGER.info("Downloaded from: {} to: {} with rate limiter; Response
status code: {}", uri, dest, statusCode);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarCompressionUtilsTest.java
similarity index 60%
rename from
pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java
rename to
pinot-common/src/test/java/org/apache/pinot/common/utils/TarCompressionUtilsTest.java
index 128f87933b..b326855511 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarCompressionUtilsTest.java
@@ -24,9 +24,11 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.List;
+import java.util.Map;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.testng.annotations.AfterMethod;
@@ -38,11 +40,12 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
-public class TarGzCompressionUtilsTest {
- private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"TarGzCompressionUtilsTest");
+public class TarCompressionUtilsTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
TarCompressionUtilsTest.class.getName());
private static final File DATA_DIR = new File(TEMP_DIR, "dataDir");
private static final File TAR_DIR = new File(TEMP_DIR, "tarDir");
private static final File UNTAR_DIR = new File(TEMP_DIR, "untarDir");
+ private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY =
CompressorStreamFactory.getSingleton();
@BeforeMethod
public void setUp()
@@ -61,35 +64,49 @@ public class TarGzCompressionUtilsTest {
@Test
public void testFile()
- throws IOException {
+ throws IOException, CompressorException {
+ for (String compressedTarFileExtension :
TarCompressionUtils.COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ testFile(compressedTarFileExtension);
+ }
+ }
+
+ public void testFile(String compressedTarFileExtension)
+ throws IOException, CompressorException {
String fileName = "data";
String fileContent = "fileContent";
File dataFile = new File(DATA_DIR, fileName);
FileUtils.write(dataFile, fileContent);
- File tarGzFile = new File(TAR_DIR, fileName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(dataFile, tarGzFile);
+ File compressedTarFile = new File(TAR_DIR, fileName +
compressedTarFileExtension);
+ TarCompressionUtils.createCompressedTarFile(dataFile, compressedTarFile);
- List<File> untarredFiles = TarGzCompressionUtils.untar(tarGzFile,
UNTAR_DIR);
+ List<File> untarredFiles = TarCompressionUtils.untar(compressedTarFile,
UNTAR_DIR);
assertEquals(untarredFiles.size(), 1);
File untarredFile = untarredFiles.get(0);
assertEquals(untarredFile, new File(UNTAR_DIR, fileName));
assertEquals(FileUtils.readFileToString(untarredFile), fileContent);
untarredFile = new File(UNTAR_DIR, "untarred");
- TarGzCompressionUtils.untarOneFile(tarGzFile, fileName, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, fileName,
untarredFile);
assertEquals(FileUtils.readFileToString(untarredFile), fileContent);
}
@Test
public void testDirectories()
- throws IOException {
+ throws IOException, CompressorException {
+ for (String compressedTarFileExtension :
TarCompressionUtils.COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ testDirectories(compressedTarFileExtension);
+ }
+ }
+
+ public void testDirectories(String compressedTarFileExtension)
+ throws IOException, CompressorException {
String dirToTarName1 = "dir1";
String dirToTarName2 = "dir2";
File dir1 = new File(DATA_DIR, dirToTarName1);
File dir2 = new File(DATA_DIR, dirToTarName2);
- File[] dirsToTar = new File[] {dir1, dir2};
+ File[] dirsToTar = new File[]{dir1, dir2};
String fileName1 = "data1";
String fileContent1 = "fileContent1";
@@ -98,11 +115,11 @@ public class TarGzCompressionUtilsTest {
FileUtils.write(new File(dir1, fileName1), fileContent1);
FileUtils.write(new File(dir2, fileName2), fileContent2);
- String outputTarName = "output_tar" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION;
- File tarGzFile = new File(TAR_DIR, outputTarName);
- TarGzCompressionUtils.createTarGzFile(dirsToTar, tarGzFile);
+ String outputTarName = "output_tar" + compressedTarFileExtension;
+ File compressedTarFile = new File(TAR_DIR, outputTarName);
+ TarCompressionUtils.createCompressedTarFile(dirsToTar, compressedTarFile);
- List<File> untarredFiles = TarGzCompressionUtils.untar(tarGzFile,
UNTAR_DIR);
+ List<File> untarredFiles = TarCompressionUtils.untar(compressedTarFile,
UNTAR_DIR);
assertEquals(untarredFiles.size(), 4);
/*
@@ -125,7 +142,6 @@ public class TarGzCompressionUtilsTest {
assertEquals(filesDir1.length, 1);
assertEquals(FileUtils.readFileToString(new File(untarredFileDir1,
fileName1)), fileContent1);
-
File[] filesDir2 = untarredFileDir2.listFiles();
assertNotNull(filesDir2);
assertEquals(filesDir2.length, 1);
@@ -134,7 +150,14 @@ public class TarGzCompressionUtilsTest {
@Test
public void testDirectory()
- throws IOException {
+ throws IOException, CompressorException {
+ for (String compressedTarFileExtension :
TarCompressionUtils.COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ testDirectory(compressedTarFileExtension);
+ }
+ }
+
+ public void testDirectory(String compressedTarFileExtension)
+ throws IOException, CompressorException {
String dirName = "dir";
File dir = new File(DATA_DIR, dirName);
String fileName1 = "data1";
@@ -144,10 +167,10 @@ public class TarGzCompressionUtilsTest {
FileUtils.write(new File(dir, fileName1), fileContent1);
FileUtils.write(new File(dir, fileName2), fileContent2);
- File tarGzFile = new File(TAR_DIR, dirName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(dir, tarGzFile);
+ File compressedTarFile = new File(TAR_DIR, dirName +
compressedTarFileExtension);
+ TarCompressionUtils.createCompressedTarFile(dir, compressedTarFile);
- List<File> untarredFiles = TarGzCompressionUtils.untar(tarGzFile,
UNTAR_DIR);
+ List<File> untarredFiles = TarCompressionUtils.untar(compressedTarFile,
UNTAR_DIR);
assertEquals(untarredFiles.size(), 3);
File untarredFile = untarredFiles.get(0);
assertEquals(untarredFile, new File(UNTAR_DIR, dirName));
@@ -158,12 +181,12 @@ public class TarGzCompressionUtilsTest {
assertEquals(FileUtils.readFileToString(new File(untarredFile,
fileName2)), fileContent2);
untarredFile = new File(UNTAR_DIR, "untarred");
- TarGzCompressionUtils.untarOneFile(tarGzFile, fileName1, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, fileName1,
untarredFile);
assertEquals(FileUtils.readFileToString(untarredFile), fileContent1);
- TarGzCompressionUtils.untarOneFile(tarGzFile, fileName2, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, fileName2,
untarredFile);
assertEquals(FileUtils.readFileToString(untarredFile), fileContent2);
try {
- TarGzCompressionUtils.untarOneFile(tarGzFile, dirName, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, dirName,
untarredFile);
fail();
} catch (IOException e) {
// Expected
@@ -172,6 +195,13 @@ public class TarGzCompressionUtilsTest {
@Test
public void testSubDirectories()
+ throws IOException, CompressorException {
+ for (String compressedTarFileExtension :
TarCompressionUtils.COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ testSubDirectories(compressedTarFileExtension);
+ }
+ }
+
+ public void testSubDirectories(String compressedTarFileExtension)
throws IOException {
String dirName = "dir";
File dir = new File(DATA_DIR, dirName);
@@ -186,10 +216,10 @@ public class TarGzCompressionUtilsTest {
FileUtils.write(new File(subDir1, fileName1), fileContent1);
FileUtils.write(new File(subDir2, fileName2), fileContent2);
- File tarGzFile = new File(TAR_DIR, dirName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(dir, tarGzFile);
+ File compressedTarFile = new File(TAR_DIR, dirName +
compressedTarFileExtension);
+ TarCompressionUtils.createCompressedTarFile(dir, compressedTarFile);
- List<File> untarredFiles = TarGzCompressionUtils.untar(tarGzFile,
UNTAR_DIR);
+ List<File> untarredFiles = TarCompressionUtils.untar(compressedTarFile,
UNTAR_DIR);
assertEquals(untarredFiles.size(), 5);
File untarredFile = untarredFiles.get(0);
assertEquals(untarredFile, new File(UNTAR_DIR, dirName));
@@ -200,24 +230,24 @@ public class TarGzCompressionUtilsTest {
assertEquals(FileUtils.readFileToString(new File(new File(untarredFile,
subDirName2), fileName2)), fileContent2);
untarredFile = new File(UNTAR_DIR, "untarred");
- TarGzCompressionUtils.untarOneFile(tarGzFile, fileName1, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, fileName1,
untarredFile);
assertEquals(FileUtils.readFileToString(untarredFile), fileContent1);
- TarGzCompressionUtils.untarOneFile(tarGzFile, fileName2, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, fileName2,
untarredFile);
assertEquals(FileUtils.readFileToString(untarredFile), fileContent2);
try {
- TarGzCompressionUtils.untarOneFile(tarGzFile, dirName, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, dirName,
untarredFile);
fail();
} catch (IOException e) {
// Expected
}
try {
- TarGzCompressionUtils.untarOneFile(tarGzFile, subDirName1, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, subDirName1,
untarredFile);
fail();
} catch (IOException e) {
// Expected
}
try {
- TarGzCompressionUtils.untarOneFile(tarGzFile, subDirName2, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, subDirName2,
untarredFile);
fail();
} catch (IOException e) {
// Expected
@@ -226,15 +256,22 @@ public class TarGzCompressionUtilsTest {
@Test
public void testEmptyDirectory()
+ throws IOException, CompressorException {
+ for (String compressedTarFileExtension :
TarCompressionUtils.COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+ testEmptyDirectory(compressedTarFileExtension);
+ }
+ }
+
+ public void testEmptyDirectory(String compressedTarFileExtension)
throws IOException {
String dirName = "dir";
File dir = new File(DATA_DIR, dirName);
FileUtils.forceMkdir(dir);
- File tarGzFile = new File(TAR_DIR, dirName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(dir, tarGzFile);
+ File compressedTarFile = new File(TAR_DIR, dirName +
compressedTarFileExtension);
+ TarCompressionUtils.createCompressedTarFile(dir, compressedTarFile);
- List<File> untarredFiles = TarGzCompressionUtils.untar(tarGzFile,
UNTAR_DIR);
+ List<File> untarredFiles = TarCompressionUtils.untar(compressedTarFile,
UNTAR_DIR);
assertEquals(untarredFiles.size(), 1);
File untarredFile = untarredFiles.get(0);
assertEquals(untarredFile, new File(UNTAR_DIR, dirName));
@@ -244,7 +281,7 @@ public class TarGzCompressionUtilsTest {
untarredFile = new File(UNTAR_DIR, "untarred");
try {
- TarGzCompressionUtils.untarOneFile(tarGzFile, dirName, untarredFile);
+ TarCompressionUtils.untarOneFile(compressedTarFile, dirName,
untarredFile);
fail();
} catch (IOException e) {
// Expected
@@ -254,26 +291,35 @@ public class TarGzCompressionUtilsTest {
@Test
public void testBadFilePath()
throws IOException {
+ for (Map.Entry<String, String> entry :
TarCompressionUtils.COMPRESSOR_NAME_BY_FILE_EXTENSIONS.entrySet()) {
+ testBadFilePath(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public void testBadFilePath(String compressedTarFileExtension, String
compressorName)
+ throws IOException {
String fileName = "data";
String fileContent = "fileContent";
File dataFile = new File(DATA_DIR, fileName);
FileUtils.write(dataFile, fileContent);
- File badTarGzFile = new File(TAR_DIR, "bad" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- try (OutputStream fileOut = Files.newOutputStream(badTarGzFile.toPath());
- OutputStream gzipOut = new GzipCompressorOutputStream(fileOut);
- TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut))
{
- tarGzOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+ File badCompressedTarFile = new File(TAR_DIR, "bad" +
compressedTarFileExtension);
+ try (OutputStream fileOut =
Files.newOutputStream(badCompressedTarFile.toPath());
+ OutputStream compressorOut =
COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName, fileOut);
+ TarArchiveOutputStream tarOut = new
TarArchiveOutputStream(compressorOut)) {
+ tarOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
TarArchiveEntry entry = new TarArchiveEntry(dataFile,
"../bad/path/data");
- tarGzOut.putArchiveEntry(entry);
+ tarOut.putArchiveEntry(entry);
try (InputStream in = Files.newInputStream(dataFile.toPath())) {
- IOUtils.copy(in, tarGzOut);
+ IOUtils.copy(in, tarOut);
}
- tarGzOut.closeArchiveEntry();
+ tarOut.closeArchiveEntry();
+ } catch (CompressorException e) {
+ throw new IOException(e);
}
try {
- TarGzCompressionUtils.untar(badTarGzFile, UNTAR_DIR);
+ TarCompressionUtils.untar(badCompressedTarFile, UNTAR_DIR);
fail();
} catch (IOException e) {
// Expected
@@ -281,7 +327,7 @@ public class TarGzCompressionUtilsTest {
// Allow untar one file to the given destination
File untarredFile = new File(UNTAR_DIR, "untarred");
- TarGzCompressionUtils.untarOneFile(badTarGzFile, fileName, untarredFile);
+ TarCompressionUtils.untarOneFile(badCompressedTarFile, fileName,
untarredFile);
assertEquals(FileUtils.readFileToString(untarredFile), fileContent);
}
}
diff --git
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/SegmentOp.java
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/SegmentOp.java
index a5a5736908..c8c0e0abc5 100644
---
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/SegmentOp.java
+++
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/SegmentOp.java
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SqlResultComparator;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.controller.api.resources.TableViews;
import org.apache.pinot.controller.helix.ControllerTest;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -208,8 +208,8 @@ public class SegmentOp extends BaseOp {
driver.build();
File indexDir = new File(outputDir, _segmentName);
LOGGER.info("Successfully created segment: {} at directory: {}",
_segmentName, indexDir);
- File segmentTarFile = new File(outputDir, _segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+ File segmentTarFile = new File(outputDir, _segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
return segmentTarFile;
diff --git
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
index 9e98d28116..743d222b35 100644
---
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
+++
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
@@ -38,7 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
@@ -241,7 +241,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
segmentTarFile = new File(_outputDirURI,
String.format("%s_%d%s", segmentName, System.currentTimeMillis(),
Constants.TAR_GZ_FILE_EXT));
}
- TarGzCompressionUtils.createTarGzFile(new File(segmentDir, segmentName),
segmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(new File(segmentDir,
segmentName), segmentTarFile);
LOGGER.info("Created segment tar: {} for segment: {} of Pinot table:
{}", segmentTarFile.getAbsolutePath(),
segmentName, _tableNameWithType);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 993028585c..e1541460cc 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -41,7 +41,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
@@ -441,11 +441,11 @@ public class LLCSegmentCompletionHandlers {
FileUtils.forceMkdir(tempIndexDir);
// Extract metadata.properties
- TarGzCompressionUtils.untarOneFile(segmentFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ TarCompressionUtils.untarOneFile(segmentFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
new File(tempIndexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
// Extract creation.meta
- TarGzCompressionUtils.untarOneFile(segmentFile,
V1Constants.SEGMENT_CREATION_META,
+ TarCompressionUtils.untarOneFile(segmentFile,
V1Constants.SEGMENT_CREATION_META,
new File(tempIndexDir, V1Constants.SEGMENT_CREATION_META));
// Load segment metadata
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 2c863b9e76..239ba7e6e0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -32,7 +32,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -148,7 +148,7 @@ public class FileIngestionHelper {
// Tar segment dir
File segmentTarFile =
new File(segmentTarDir, segmentName +
org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
- TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName),
segmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(new File(outputDir,
segmentName), segmentTarFile);
// Upload segment
IngestionConfig ingestionConfigOverride = new IngestionConfig();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 0e69dd105e..6cf949563d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -32,7 +32,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
@@ -125,9 +125,9 @@ public class ZKOperatorTest {
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(config, new GenericRowRecordReader(rows));
driver.build();
- File segmentTar = new File(SEGMENT_DIR, SEGMENT_NAME +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(new File(outputDir, SEGMENT_NAME),
- new File(SEGMENT_DIR, SEGMENT_NAME +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
+ File segmentTar = new File(SEGMENT_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(new File(outputDir,
SEGMENT_NAME),
+ new File(SEGMENT_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
FileUtils.deleteQuietly(outputDir);
return segmentTar;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 97b7530b04..4f24dc3d52 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -54,7 +54,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -792,7 +792,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
failedAttempts.get());
}
} else {
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl,
segmentTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from: {} to: {}, file
length: {}", segmentName, downloadUrl,
segmentTarFile, segmentTarFile.length());
@@ -819,7 +819,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_tableNameWithType);
_logger.info("Downloading segment: {} from peers", segmentName);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
try {
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName,
_peerDownloadScheme, () -> {
List<URI> peerServerURIs =
@@ -848,7 +848,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
try {
// If an exception is thrown when untarring, it means the tar file is
broken or not found after the retry. Thus,
// there's no need to retry again.
- File untarredSegmentDir = TarGzCompressionUtils.untar(segmentTarFile,
untarDir).get(0);
+ File untarredSegmentDir = TarCompressionUtils.untar(segmentTarFile,
untarDir).get(0);
_logger.info("Untarred segment: {} into: {}", segmentName,
untarredSegmentDir);
return untarredSegmentDir;
} catch (Exception e) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 648bed6080..73915c5a2e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -48,7 +48,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -1060,9 +1060,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
if (forCommit) {
- File segmentTarFile = new File(dataDir, _segmentNameStr +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(dataDir, _segmentNameStr +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
try {
- TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(indexDir,
segmentTarFile);
} catch (IOException e) {
String errorMessage =
String.format("Caught exception while taring index directory
from: %s to: %s", indexDir, segmentTarFile);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/metadata/DefaultMetadataExtractor.java
b/pinot-core/src/main/java/org/apache/pinot/core/metadata/DefaultMetadataExtractor.java
index 18936d0525..2de6ca2209 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/metadata/DefaultMetadataExtractor.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/metadata/DefaultMetadataExtractor.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.metadata;
import java.io.File;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -35,7 +35,7 @@ public class DefaultMetadataExtractor implements
MetadataExtractor {
throws Exception {
// NOTE: While there is TarGzCompressionUtils.untarOneFile(), we use
untar() here to unpack all files in the segment
// in order to ensure the segment is not corrupted.
- File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile,
unzippedSegmentDir).get(0);
+ File indexDir = TarCompressionUtils.untar(tarredSegmentFile,
unzippedSegmentDir).get(0);
return new SegmentMetadataImpl(indexDir);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 1282e493fd..d10694ef78 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.tier.TierFactory;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -294,7 +294,7 @@ public class BaseTableDataManagerTest {
throws Exception {
File indexDir = createSegment(SegmentVersion.v3, 5);
SegmentZKMetadata zkMetadata =
- makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
+ makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
// Same CRC but force to download.
BaseTableDataManager tableDataManager = createTableManager();
@@ -567,9 +567,9 @@ public class BaseTableDataManagerTest {
File tempDir = new File(TEMP_DIR, "testDownloadAndDecrypt");
String fileName = "tmp.txt";
FileUtils.write(new File(tempDir, fileName), "this is from somewhere
remote");
- String tarFileName = SEGMENT_NAME +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION;
+ String tarFileName = SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION;
File tempTarFile = new File(TEMP_DIR, tarFileName);
- TarGzCompressionUtils.createTarGzFile(tempDir, tempTarFile);
+ TarCompressionUtils.createCompressedTarFile(tempDir, tempTarFile);
SegmentZKMetadata zkMetadata = mock(SegmentZKMetadata.class);
when(zkMetadata.getSegmentName()).thenReturn(SEGMENT_NAME);
@@ -607,10 +607,10 @@ public class BaseTableDataManagerTest {
File tempRootDir =
tableDataManager.getTmpSegmentDataDir("test-untar-move");
// All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, SEGMENT_NAME +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempTar = new File(tempRootDir, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File tempInputDir = new File(tempRootDir, "input");
FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment
dir");
- TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ TarCompressionUtils.createCompressedTarFile(tempInputDir, tempTar);
FileUtils.deleteQuietly(tempInputDir);
// The destination is the segment directory at the same level of
tempRootDir.
@@ -687,15 +687,14 @@ public class BaseTableDataManagerTest {
private static SegmentZKMetadata createRawSegment(SegmentVersion
segmentVersion, int numRows)
throws Exception {
File indexDir = createSegment(segmentVersion, numRows);
- return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION),
- true);
+ return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
}
private static SegmentZKMetadata makeRawSegment(File indexDir, File
rawSegmentFile, boolean deleteIndexDir)
throws Exception {
long crc = getCRC(indexDir);
SegmentZKMetadata zkMetadata = new SegmentZKMetadata(SEGMENT_NAME);
- TarGzCompressionUtils.createTarGzFile(indexDir, rawSegmentFile);
+ TarCompressionUtils.createCompressedTarFile(indexDir, rawSegmentFile);
zkMetadata.setDownloadUrl("file://" + rawSegmentFile.getAbsolutePath());
zkMetadata.setCrc(crc);
if (deleteIndexDir) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index af9c4c8602..7028844169 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
@@ -102,7 +102,7 @@ public class FakeStreamConfigUtils {
FileUtils.deleteDirectory(outputDir);
}
File avroTarFile = getResourceFile(AVRO_TAR_FILE);
- return TarGzCompressionUtils.untar(avroTarFile, outputDir);
+ return TarCompressionUtils.untar(avroTarFile, outputDir);
}
/**
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 33bd9b008e..99a1b2221d 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -39,7 +39,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.ResultSetGroup;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
@@ -551,7 +551,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
throws Exception {
InputStream inputStream =
getClass().getClassLoader().getResourceAsStream(tarFileName);
Assert.assertNotNull(inputStream);
- return TarGzCompressionUtils.untar(inputStream, outputDir);
+ return TarCompressionUtils.untar(inputStream, outputDir);
}
/**
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 0259126daf..594727646a 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -66,7 +66,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
@@ -356,8 +356,8 @@ public class ClusterIntegrationTestUtils {
// Tar the segment
String segmentName = driver.getSegmentName();
File indexDir = new File(segmentDir, segmentName);
- File segmentTarFile = new File(tarDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+ File segmentTarFile = new File(tarDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
}
/**
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 632ff2def9..0b7dda70a4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.metrics.MetricValueUtils;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.utils.SqlResultComparator;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -345,8 +345,8 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Tar the segment
String segmentName = driver.getSegmentName();
File indexDir = new File(segmentDir, segmentName);
- File segmentTarFile = new File(tarDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+ File segmentTarFile = new File(tarDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
return null;
}));
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/startree/SegmentInfoProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/startree/SegmentInfoProvider.java
index a3d9a75db4..42f091a74b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/startree/SegmentInfoProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/startree/SegmentInfoProvider.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -96,7 +96,7 @@ public class SegmentInfoProvider {
if (segmentFile.isFile()) {
tmpDir = File.createTempFile(SEGMENT_INFO_PROVIDER, null, new
File(TMP_DIR));
FileUtils.deleteQuietly(tmpDir);
- segmentDir = TarGzCompressionUtils.untar(segmentFile, tmpDir).get(0);
+ segmentDir = TarCompressionUtils.untar(segmentFile, tmpDir).get(0);
} else {
segmentDir = segmentFile;
}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
index 7d0962538c..aca52847be 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
@@ -27,7 +27,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.integration.tests.ClusterTest;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -109,7 +109,7 @@ public class BenchmarkOfflineIndexReader {
FileUtils.deleteQuietly(TEMP_DIR);
File avroDir = new File(TEMP_DIR, "avro");
- TarGzCompressionUtils.untar(new
File(TestUtils.getFileFromResourceUrl(RESOURCE_URL)), avroDir);
+ TarCompressionUtils.untar(new
File(TestUtils.getFileFromResourceUrl(RESOURCE_URL)), avroDir);
File avroFile = new File(avroDir, AVRO_FILE_NAME);
File dataDir = new File(TEMP_DIR, "index");
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
index 269f2f39e2..29c68ec3ec 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -28,7 +28,7 @@ import java.nio.file.SimpleFileVisitor;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -78,7 +78,7 @@ public class SegmentGenerationJobUtils implements
Serializable {
}
});
LOGGER.info("Tarring metadata files from: [{}] to: {}", metadataFiles,
localMetadataTarFile);
- TarGzCompressionUtils.createTarGzFile(metadataFiles.toArray(new File[0]),
localMetadataTarFile);
+ TarCompressionUtils.createCompressedTarFile(metadataFiles.toArray(new
File[0]), localMetadataTarFile);
}
public static void moveLocalTarFileToRemote(File localMetadataTarFile, URI
outputMetadataTarURI, boolean overwrite)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index 3efad6b05d..bd3f80c808 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -79,7 +79,7 @@ public class HadoopSegmentCreationMapper extends
Mapper<LongWritable, Text, Long
if (localPluginsTarFile.exists()) {
File pluginsDirFile =
Files.createTempDirectory(PINOT_PLUGINS_DIR).toFile();
try {
- TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+ TarCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
} catch (Exception e) {
LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]",
localPluginsTarFile, e);
throw new RuntimeException(e);
@@ -178,7 +178,7 @@ public class HadoopSegmentCreationMapper extends
Mapper<LongWritable, Text, Long
String segmentTarFileName = URIUtils.encode(segmentName +
Constants.TAR_GZ_FILE_EXT);
File localSegmentTarFile = new File(localOutputTempDir,
segmentTarFileName);
LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir,
localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir,
localSegmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(localSegmentDir,
localSegmentTarFile);
long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}",
segmentName,
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index df9883622d..f63f424187 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -375,7 +375,7 @@ public class HadoopSegmentGenerationJobRunner extends
Configured implements Inge
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
File[] files = validPluginDirectories.toArray(new File[0]);
- TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile);
+ TarCompressionUtils.createCompressedTarFile(files, pluginsTarGzFile);
// Copy to staging directory
Path cachedPluginsTarball = new Path(stagingDirURI.toString(),
SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 6ae7ff97ab..dcaf01379a 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -32,7 +32,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -228,7 +228,7 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
if (localPluginsTarFile.exists()) {
File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
try {
- TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+ TarCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
} catch (Exception e) {
LOGGER.error("Failed to untar local Pinot plugins tarball file
[{}]", localPluginsTarFile, e);
throw new RuntimeException(e);
@@ -286,7 +286,7 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
String segmentTarFileName = URIUtils.encode(segmentName +
Constants.TAR_GZ_FILE_EXT);
File localSegmentTarFile = new File(localOutputTempDir,
segmentTarFileName);
LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir,
localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir,
localSegmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(localSegmentDir,
localSegmentTarFile);
long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
LOGGER.info("Size for segment: {}, uncompressed: {}, compressed:
{}", segmentName,
@@ -372,7 +372,7 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
File[] files = validPluginDirectories.toArray(new File[0]);
- TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile);
+ TarCompressionUtils.createCompressedTarFile(files, pluginsTarGzFile);
} catch (IOException e) {
LOGGER.error("Failed to tar plugins directories", e);
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
index ef1f6cea5d..f9844c4fb2 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
@@ -32,7 +32,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -227,7 +227,7 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
if (localPluginsTarFile.exists()) {
File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
try {
- TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+ TarCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
} catch (Exception e) {
LOGGER.error("Failed to untar local Pinot plugins tarball file
[{}]", localPluginsTarFile, e);
throw new RuntimeException(e);
@@ -284,7 +284,7 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
String segmentTarFileName = URIUtils.encode(segmentName +
Constants.TAR_GZ_FILE_EXT);
File localSegmentTarFile = new File(localOutputTempDir,
segmentTarFileName);
LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir,
localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir,
localSegmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(localSegmentDir,
localSegmentTarFile);
long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
LOGGER.info("Size for segment: {}, uncompressed: {}, compressed:
{}", segmentName,
@@ -369,7 +369,7 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
File[] files = validPluginDirectories.toArray(new File[0]);
- TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile);
+ TarCompressionUtils.createCompressedTarFile(files, pluginsTarGzFile);
} catch (IOException e) {
LOGGER.error("Failed to tar plugins directories", e);
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index a85247fc3d..0df6671418 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -268,7 +268,7 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
String segmentTarFileName = URIUtils.encode(segmentName +
Constants.TAR_GZ_FILE_EXT);
localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir,
localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir,
localSegmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(localSegmentDir,
localSegmentTarFile);
long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}",
segmentName,
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c9cfdc0619..248b0f2553 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -43,7 +43,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifi
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionConf;
@@ -219,7 +219,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String.format("Decompressing segment from: %s (%d out of %d)",
downloadURLs[i], (i + 1),
downloadURLs.length));
File segmentDir = new File(tempDataDir, "segmentDir_" + i);
- File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
+ File indexDir = TarCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
inputSegmentDirs.add(indexDir);
if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
@@ -253,8 +253,8 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String.format("Compressing segment: %s (%d out of %d)",
segmentConversionResult.getSegmentName(), count++,
numOutputSegments));
File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
- segmentConversionResult.getSegmentName() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(convertedSegmentDir,
convertedSegmentTarFile);
+ segmentConversionResult.getSegmentName() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(convertedSegmentDir,
convertedSegmentTarFile);
tarredSegmentFiles.add(convertedSegmentTarFile);
if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
LOGGER.warn("Failed to delete converted segment: {}",
convertedSegmentDir.getAbsolutePath());
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 2af6face4d..c75fa99bf8 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -36,7 +36,7 @@ import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.event.MinionEventObserver;
@@ -113,7 +113,7 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
// Un-tar the segment file
_eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment
from: " + downloadURL);
File segmentDir = new File(tempDataDir, "segmentDir");
- File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
+ File indexDir = TarCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
}
@@ -149,9 +149,8 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
// Tar the converted segment
_eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: "
+ segmentName);
- File convertedTarredSegmentFile =
- new File(tempDataDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(convertedSegmentDir,
convertedTarredSegmentFile);
+ File convertedTarredSegmentFile = new File(tempDataDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(convertedSegmentDir,
convertedTarredSegmentFile);
if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
LOGGER.warn("Failed to delete converted segment: {}",
convertedSegmentDir.getAbsolutePath());
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
index 898c1a1eb1..eb14fb590b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
@@ -29,7 +29,7 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.event.MinionEventObserver;
@@ -257,7 +257,7 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
File localSegmentTarFile = new File(localOutputTempDir,
segmentTarFileName);
LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir,
localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir,
localSegmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(localSegmentDir,
localSegmentTarFile);
long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}",
segmentName,
diff --git
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
index 7ecf62513e..ee0550cac4 100644
---
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
+++
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
@@ -36,7 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
@@ -208,7 +208,7 @@ public class FileBasedSegmentWriter implements
SegmentWriter {
}
}
}
- TarGzCompressionUtils.createTarGzFile(new File(segmentDir, segmentName),
segmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(new File(segmentDir,
segmentName), segmentTarFile);
LOGGER.info("Created segment tar: {} for segment: {} of table: {}",
segmentTarFile.getAbsolutePath(), segmentName,
_tableNameWithType);
diff --git
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
index a6e4b9c32f..844a782a29 100644
---
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
+++
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -170,7 +170,7 @@ public class FileBasedSegmentWriterTest {
// verify num docs and cardinality of aString
File segmentTar = new File(_outputDir,
"segmentWriter_1616238000000_1616241600000.tar.gz");
Assert.assertTrue(segmentTar.exists());
- TarGzCompressionUtils.untar(segmentTar, _outputDir);
+ TarCompressionUtils.untar(segmentTar, _outputDir);
SegmentMetadataImpl segmentMetadata =
new SegmentMetadataImpl(new File(_outputDir,
"segmentWriter_1616238000000_1616241600000"));
Assert.assertEquals(segmentMetadata.getTotalDocs(), 3);
@@ -186,7 +186,7 @@ public class FileBasedSegmentWriterTest {
// verify num docs and cardinality of aString
segmentTar = new File(_outputDir,
"segmentWriter_1616245200000_1616245200000.tar.gz");
Assert.assertTrue(segmentTar.exists());
- TarGzCompressionUtils.untar(segmentTar, _outputDir);
+ TarCompressionUtils.untar(segmentTar, _outputDir);
segmentMetadata = new SegmentMetadataImpl(new File(_outputDir,
"segmentWriter_1616245200000_1616245200000"));
Assert.assertEquals(segmentMetadata.getTotalDocs(), 2);
Assert.assertEquals(segmentMetadata.getColumnMetadataFor("aString").getCardinality(),
2);
@@ -214,7 +214,7 @@ public class FileBasedSegmentWriterTest {
File[] files = _outputDir.listFiles();
Assert.assertEquals(files.length, 1);
File segmentTar = files[0];
- TarGzCompressionUtils.untar(segmentTar, _outputDir);
+ TarCompressionUtils.untar(segmentTar, _outputDir);
SegmentMetadataImpl segmentMetadata =
new SegmentMetadataImpl(new File(_outputDir,
files[0].getName().split(Constants.TAR_GZ_FILE_EXT)[0]));
Assert.assertEquals(segmentMetadata.getTotalDocs(), 0);
@@ -260,7 +260,7 @@ public class FileBasedSegmentWriterTest {
// segment name should be customSegmentName
File[] segmentTars = _outputDir.listFiles();
Assert.assertEquals(segmentTars.length, 1);
- TarGzCompressionUtils.untar(segmentTars[0], _outputDir);
+ TarCompressionUtils.untar(segmentTars[0], _outputDir);
Assert.assertEquals(segmentTars[0].getName(), "customSegmentName.tar.gz");
FileUtils.deleteQuietly(_outputDir);
segmentWriter.close();
@@ -278,7 +278,7 @@ public class FileBasedSegmentWriterTest {
// segment name should be normalized for hours since epoch
segmentTars = _outputDir.listFiles();
Assert.assertEquals(segmentTars.length, 1);
- TarGzCompressionUtils.untar(segmentTars[0], _outputDir);
+ TarCompressionUtils.untar(segmentTars[0], _outputDir);
Assert.assertEquals(segmentTars[0].getName(),
"segmentWriter_2021-03-20-11_2021-03-20-12.tar.gz");
FileUtils.deleteQuietly(_outputDir);
@@ -296,7 +296,7 @@ public class FileBasedSegmentWriterTest {
// segment name should be simple
segmentTars = _outputDir.listFiles();
Assert.assertEquals(segmentTars.length, 1);
- TarGzCompressionUtils.untar(segmentTars[0], _outputDir);
+ TarCompressionUtils.untar(segmentTars[0], _outputDir);
Assert.assertEquals(segmentTars[0].getName(),
"segmentWriter_1616238000000_1616241600000_1001.tar.gz");
FileUtils.deleteQuietly(_outputDir);
}
@@ -333,7 +333,7 @@ public class FileBasedSegmentWriterTest {
File[] segmentTars = _outputDir.listFiles();
Assert.assertEquals(segmentTars.length, 1);
Assert.assertEquals(segmentTars[0].getName(),
"segmentWriter_1616238000000_1616238000000.tar.gz");
- TarGzCompressionUtils.untar(segmentTars[0], _outputDir);
+ TarCompressionUtils.untar(segmentTars[0], _outputDir);
File segmentDir = new File(_outputDir,
"segmentWriter_1616238000000_1616238000000");
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir);
Assert.assertEquals(segmentMetadata.getTotalDocs(), 3);
@@ -348,7 +348,7 @@ public class FileBasedSegmentWriterTest {
segmentTars = _outputDir.listFiles();
Assert.assertEquals(segmentTars.length, 1);
Assert.assertEquals(segmentTars[0].getName(),
"segmentWriter_1616238000000_1616238000000.tar.gz");
- TarGzCompressionUtils.untar(segmentTars[0], _outputDir);
+ TarCompressionUtils.untar(segmentTars[0], _outputDir);
segmentMetadata = new SegmentMetadataImpl(segmentDir);
Assert.assertEquals(segmentMetadata.getTotalDocs(), 2);
FileUtils.deleteQuietly(segmentDir);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 4a5dd21948..eac7525ee1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -42,7 +42,7 @@ import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SimpleHttpResponse;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.segment.spi.V1Constants;
@@ -295,7 +295,7 @@ public class SegmentPushUtils implements Serializable {
LOGGER.info("Checking if metadata tar gz file {} exists",
metadataTarGzFilePath);
if (spec.getPushJobSpec().isPreferMetadataTarGz() &&
fileSystem.exists(metadataTarGzFilePath)) {
segmentMetadataFile = new File(FileUtils.getTempDirectory(),
- "segmentMetadata-" + UUID.randomUUID() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ "segmentMetadata-" + UUID.randomUUID() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
if (segmentMetadataFile.exists()) {
FileUtils.forceDelete(segmentMetadataFile);
}
@@ -402,7 +402,7 @@ public class SegmentPushUtils implements Serializable {
throws Exception {
String uuid = UUID.randomUUID().toString();
File tarFile =
- new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentMetadataDir = new File(FileUtils.getTempDirectory(),
"segmentMetadataDir-" + uuid);
try {
if (fileSystem instanceof LocalPinotFS) {
@@ -419,21 +419,21 @@ public class SegmentPushUtils implements Serializable {
// Extract metadata.properties
LOGGER.info("Trying to untar Metadata file from: [{}] to [{}]", tarFile,
segmentMetadataDir);
- TarGzCompressionUtils.untarOneFile(tarFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ TarCompressionUtils.untarOneFile(tarFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
new File(segmentMetadataDir,
V1Constants.MetadataKeys.METADATA_FILE_NAME));
// Extract creation.meta
LOGGER.info("Trying to untar CreationMeta file from: [{}] to [{}]",
tarFile, segmentMetadataDir);
- TarGzCompressionUtils.untarOneFile(tarFile,
V1Constants.SEGMENT_CREATION_META,
+ TarCompressionUtils.untarOneFile(tarFile,
V1Constants.SEGMENT_CREATION_META,
new File(segmentMetadataDir, V1Constants.SEGMENT_CREATION_META));
File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(),
- "segmentMetadata-" + uuid +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ "segmentMetadata-" + uuid +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
if (segmentMetadataTarFile.exists()) {
FileUtils.forceDelete(segmentMetadataTarFile);
}
LOGGER.info("Trying to tar segment metadata dir [{}] to [{}]",
segmentMetadataDir, segmentMetadataTarFile);
- TarGzCompressionUtils.createTarGzFile(segmentMetadataDir,
segmentMetadataTarFile);
+ TarCompressionUtils.createCompressedTarFile(segmentMetadataDir,
segmentMetadataTarFile);
return segmentMetadataTarFile;
} finally {
if (!(fileSystem instanceof LocalPinotFS)) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b0f3b5dc94..228ba2277b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -73,7 +73,7 @@ import
org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -463,10 +463,11 @@ public class TablesResource {
tmpSegmentTarDir.mkdir();
File segmentTarFile =
org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(tmpSegmentTarDir,
- tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION,
+ tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION,
"Invalid table / segment name: %s , %s", tableNameWithType,
segmentName);
- TarGzCompressionUtils.createTarGzFile(new
File(tableDataManager.getTableDataDir(), segmentName), segmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(new
File(tableDataManager.getTableDataDir(), segmentName),
+ segmentTarFile);
Response.ResponseBuilder builder = Response.ok();
builder.entity((StreamingOutput) output -> {
try {
@@ -817,10 +818,11 @@ public class TablesResource {
segmentTarUploadDir.mkdir();
segmentTarFile =
org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(segmentTarUploadDir,
- tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION,
+ tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION,
"Invalid table / segment name: %s, %s", tableNameWithType,
segmentName);
- TarGzCompressionUtils.createTarGzFile(new
File(tableDataManager.getTableDataDir(), segmentName), segmentTarFile);
+ TarCompressionUtils.createCompressedTarFile(new
File(tableDataManager.getTableDataDir(), segmentName),
+ segmentTarFile);
// Use segment uploader to upload the segment tar file to segment store
and return the segment download url.
SegmentUploader segmentUploader =
_serverInstance.getInstanceDataManager().getSegmentUploader();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 8abb11103f..198cbb0434 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.configuration2.ex.ConfigurationException;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -93,7 +93,7 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
private static final String STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT =
"segment.stream.download.untar.rate.limit.bytes.per.sec";
private static final long DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT =
- TarGzCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
+ TarCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
// Key of whether to use streamed server segment download-untar
private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR =
"segment.stream.download.untar";
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 6e7966ae4a..fe717fab2e 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -36,7 +36,7 @@ import org.apache.pinot.common.restlet.resources.TablesList;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -363,11 +363,11 @@ public class TablesResourceTest extends BaseResourceTest {
FileUtils.forceMkdir(tempMetadataDir);
// Extract metadata.properties
- TarGzCompressionUtils.untarOneFile(segmentFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ TarCompressionUtils.untarOneFile(segmentFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
new File(tempMetadataDir,
V1Constants.MetadataKeys.METADATA_FILE_NAME));
// Extract creation.meta
- TarGzCompressionUtils.untarOneFile(segmentFile,
V1Constants.SEGMENT_CREATION_META,
+ TarCompressionUtils.untarOneFile(segmentFile,
V1Constants.SEGMENT_CREATION_META,
new File(tempMetadataDir, V1Constants.SEGMENT_CREATION_META));
// Load segment metadata
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
index c5811296d6..807d3cea9a 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import
org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
@@ -104,7 +104,7 @@ public class SegmentProcessorFrameworkCommand extends
AbstractBaseAdminCommand i
// Untar the segments if needed
if (!segmentDir.isDirectory()) {
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
- finalSegmentDir = TarGzCompressionUtils.untar(segmentDir,
untarredSegmentsDir).get(0);
+ finalSegmentDir = TarCompressionUtils.untar(segmentDir,
untarredSegmentsDir).get(0);
} else {
throw new IllegalStateException("Unsupported segment format: " +
segmentDir.getAbsolutePath());
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index 87a0543e6a..3b0e41f700 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -27,7 +27,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.hc.core5.http.Header;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -180,8 +180,8 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
// Tar the segment directory
String segmentName = segmentFile.getName();
LOGGER.info("Compressing segment: {}", segmentName);
- segmentTarFile = new File(tempDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(segmentFile, segmentTarFile);
+ segmentTarFile = new File(tempDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(segmentFile,
segmentTarFile);
} else {
segmentTarFile = segmentFile;
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index c9555c9915..65660b00ba 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -215,7 +215,7 @@ public class DictionaryToRawIndexConverter {
if (segmentDir.isFile()) {
if (segmentDir.getName().endsWith(".tar.gz") ||
segmentDir.getName().endsWith(".tgz")) {
LOGGER.info("Uncompressing input segment '{}'", segmentDir);
- newSegment = TarGzCompressionUtils.untar(segmentDir, outputDir).get(0);
+ newSegment = TarCompressionUtils.untar(segmentDir, outputDir).get(0);
} else {
LOGGER.warn("Skipping non-segment file '{}'",
segmentDir.getAbsoluteFile());
return false;
@@ -236,8 +236,8 @@ public class DictionaryToRawIndexConverter {
if (compressOutput) {
LOGGER.info("Compressing segment '{}'", newSegment);
- File segmentTarFile = new File(outputDir, newSegment.getName() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(newSegment, segmentTarFile);
+ File segmentTarFile = new File(outputDir, newSegment.getName() +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(newSegment, segmentTarFile);
FileUtils.deleteQuietly(newSegment);
}
return true;
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
index 46aee1265c..3476e56839 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
@@ -22,7 +22,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.tools.AbstractBaseCommand;
import org.apache.pinot.tools.Command;
@@ -114,7 +114,7 @@ public class PinotSegmentConvertCommand extends
AbstractBaseCommand implements C
segmentPath.put(fileName, file.getAbsolutePath());
} else if (fileName.toLowerCase().endsWith(".tar.gz") ||
fileName.toLowerCase().endsWith(".tgz")) {
// Compressed segment.
- File segment = TarGzCompressionUtils.untar(file, new File(tempDir,
fileName)).get(0);
+ File segment = TarCompressionUtils.untar(file, new File(tempDir,
fileName)).get(0);
String segmentName = segment.getName();
if (segmentPath.containsKey(segmentName)) {
throw new RuntimeException("Multiple segments with the same
segment name: " + fileName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]