This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch trypt in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 05247b0257d8435f96d4dbc948bad7650ce2da31 Author: Jennifer Dai <[email protected]> AuthorDate: Tue Dec 3 14:43:36 2019 -0800 Adding tar gz compression during segment creation --- .../generator/SegmentGeneratorConfig.java | 10 ++++++++++ .../creator/impl/SegmentIndexCreationDriverImpl.java | 20 ++++++++++++-------- .../hadoop/job/mappers/SegmentCreationMapper.java | 19 ++++++++----------- .../pinot/ingestion/common/JobConfigConstants.java | 3 +++ ...egmentBuildPushOfflineClusterIntegrationTest.java | 4 +++- 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java index 1d42001..4e57c6f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java @@ -114,6 +114,7 @@ public class SegmentGeneratorConfig { private boolean _onHeap = false; private boolean _skipTimeValueCheck = false; private boolean _nullHandlingEnabled = false; + private boolean _createTarGz = false; public SegmentGeneratorConfig() { } @@ -164,6 +165,7 @@ public class SegmentGeneratorConfig { _recordReaderPath = config._recordReaderPath; _skipTimeValueCheck = config._skipTimeValueCheck; _nullHandlingEnabled = config._nullHandlingEnabled; + _createTarGz = config._createTarGz; } /** @@ -352,6 +354,14 @@ public class SegmentGeneratorConfig { _recordReaderPath = recordReaderPath; } + public boolean isCreateTarGz() { + return _createTarGz; + } + + public void setCreateTarGz(boolean createTarGz) { + _createTarGz = createTarGz; + } + public String getOutDir() { return _outDir; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 3941f66..1fdaa7e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -32,12 +32,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.MetricFieldSpec; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.common.data.StarTreeIndexSpec; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.data.readers.RecordReaderFactory; import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; import org.apache.pinot.core.data.recordtransformer.RecordTransformer; @@ -65,6 +61,11 @@ import org.apache.pinot.core.startree.hll.HllUtil; import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder; import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig; import org.apache.pinot.core.util.CrcUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.startree.hll.HllConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -388,9 +389,6 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive // Move the temporary directory into its final location FileUtils.moveDirectory(tempIndexDir, segmentOutputDir); - // Delete the temporary directory - FileUtils.deleteQuietly(tempIndexDir); - // Convert segment format if necessary convertFormatIfNeeded(segmentOutputDir); @@ -415,6 +413,12 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive // Persist creation metadata to disk persistCreationMeta(segmentOutputDir, crc, creationTime); + if (config.isCreateTarGz()) { + File tarGzPath = new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + TarGzCompressionUtils.createTarGzOfDirectory(segmentOutputDir.getAbsolutePath(), tarGzPath.getAbsolutePath()); + FileUtils.deleteQuietly(segmentOutputDir); + } + LOGGER.info("Driver, record read time : {}", totalRecordReadTime); LOGGER.info("Driver, stats collector time : {}", totalStatsCollectorTime); LOGGER.info("Driver, indexing time : {}", totalIndexTime); diff --git a/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 438fd9c..c211d32 100644 --- a/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -36,14 +36,9 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig; import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.common.utils.DataSize; -import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.data.readers.CSVRecordReaderConfig; import org.apache.pinot.core.data.readers.FileFormat; -import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; @@ -52,6 +47,10 @@ import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.core.segment.name.SegmentNameGenerator; import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeFieldSpec; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +72,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab protected TableConfig _tableConfig; protected String _recordReaderPath; protected Path _readerConfigFile; + protected boolean _createTarGz; // HDFS segment tar directory protected Path _hdfsSegmentTarDir; @@ -120,6 +120,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab _readerConfigFile = new Path(readerConfigFile); } _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH); + _createTarGz = Boolean.getBoolean(_jobConf.get(JobConfigConstants.CREATE_TAR_GZ)); // Set up segment name generator String segmentNameGeneratorType = @@ -233,6 +234,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat)); } segmentGeneratorConfig.setOnHeap(true); + segmentGeneratorConfig.setCreateTarGz(_createTarGz); addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId); @@ -260,16 +262,11 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab String segmentName = driver.getSegmentName(); _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId); - File localSegmentDir = new File(_localSegmentDir, segmentName); String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT; File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName); - _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); - TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath()); - long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); - _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, - DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)); + _logger.info("Size for segment: {}, compressed: {}", segmentName, DataSize.fromBytes(compressedSegmentSize)); Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName); if (_useRelativePath) { diff --git a/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java b/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java index 9c9238e..9cb35b8 100644 --- a/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java +++ b/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java @@ -76,4 +76,7 @@ public class JobConfigConstants { // Assign sequence ids to input files based at each local directory level public static final String LOCAL_DIRECTORY_SEQUENCE_ID = "local.directory.sequence.id"; + + // Creates a tar gz file + public static final String CREATE_TAR_GZ = "create.tar.gz"; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java index 4c99f8c..73dc51c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.pinot.common.config.ColumnPartitionConfig; import org.apache.pinot.common.config.SegmentPartitionConfig; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.core.data.partition.PartitionFunction; import org.apache.pinot.core.data.partition.PartitionFunctionFactory; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; @@ -48,6 +47,7 @@ import org.apache.pinot.hadoop.job.HadoopSegmentCreationJob; import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentTarPushJob; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.util.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,6 +174,8 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost()); properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort()); + properties.setProperty(JobConfigConstants.CREATE_TAR_GZ, "true"); + Properties preComputeProperties = new Properties(); preComputeProperties.putAll(properties); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
