This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch trypt
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4419a3b86c19b4abc5127161eb3797cf57a66496
Author: Jennifer Dai <[email protected]>
AuthorDate: Tue Dec 3 14:57:41 2019 -0800

    Add targz to SegmentIndexCreationImpl
---
 .../generator/SegmentGeneratorConfig.java          | 10 ++++++++
 .../impl/SegmentIndexCreationDriverImpl.java       | 20 +++++++++-------
 .../hadoop/job/mappers/SegmentCreationMapper.java  | 27 ++++++++--------------
 .../pinot/ingestion/common/JobConfigConstants.java |  3 +++
 ...mentBuildPushOfflineClusterIntegrationTest.java |  4 +++-
 5 files changed, 38 insertions(+), 26 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 1d42001..4e57c6f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -114,6 +114,7 @@ public class SegmentGeneratorConfig {
   private boolean _onHeap = false;
   private boolean _skipTimeValueCheck = false;
   private boolean _nullHandlingEnabled = false;
+  private boolean _createTarGz = false;
 
   public SegmentGeneratorConfig() {
   }
@@ -164,6 +165,7 @@ public class SegmentGeneratorConfig {
     _recordReaderPath = config._recordReaderPath;
     _skipTimeValueCheck = config._skipTimeValueCheck;
     _nullHandlingEnabled = config._nullHandlingEnabled;
+    _createTarGz = config._createTarGz;
   }
 
   /**
@@ -352,6 +354,14 @@ public class SegmentGeneratorConfig {
     _recordReaderPath = recordReaderPath;
   }
 
+  public boolean isCreateTarGz() {
+    return _createTarGz;
+  }
+
+  public void setCreateTarGz(boolean createTarGz) {
+    _createTarGz = createTarGz;
+  }
+
   public String getOutDir() {
     return _outDir;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 3941f66..1fdaa7e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -32,12 +32,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.readers.RecordReaderFactory;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
@@ -65,6 +61,11 @@ import org.apache.pinot.core.startree.hll.HllUtil;
 import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder;
 import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.core.util.CrcUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.startree.hll.HllConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -388,9 +389,6 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     // Move the temporary directory into its final location
     FileUtils.moveDirectory(tempIndexDir, segmentOutputDir);
 
-    // Delete the temporary directory
-    FileUtils.deleteQuietly(tempIndexDir);
-
     // Convert segment format if necessary
     convertFormatIfNeeded(segmentOutputDir);
 
@@ -415,6 +413,12 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     // Persist creation metadata to disk
     persistCreationMeta(segmentOutputDir, crc, creationTime);
 
+    if (config.isCreateTarGz()) {
+      File tarGzPath = new File(outputDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      
TarGzCompressionUtils.createTarGzOfDirectory(segmentOutputDir.getAbsolutePath(),
 tarGzPath.getAbsolutePath());
+      FileUtils.deleteQuietly(segmentOutputDir);
+    }
+
     LOGGER.info("Driver, record read time : {}", totalRecordReadTime);
     LOGGER.info("Driver, stats collector time : {}", totalStatsCollectorTime);
     LOGGER.info("Driver, indexing time : {}", totalIndexTime);
diff --git 
a/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 438fd9c..dfb02b6 100644
--- 
a/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -36,14 +36,9 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
 import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
@@ -52,6 +47,10 @@ import 
org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +72,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
   protected TableConfig _tableConfig;
   protected String _recordReaderPath;
   protected Path _readerConfigFile;
+  protected boolean _createTarGz;
 
   // HDFS segment tar directory
   protected Path _hdfsSegmentTarDir;
@@ -81,7 +81,6 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
   protected File _localStagingDir;
   protected File _localInputDir;
   protected File _localSegmentDir;
-  protected File _localSegmentTarDir;
 
   /**
    * Generate a relative output directory path when `useRelativePath` flag is 
on.
@@ -120,6 +119,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
       _readerConfigFile = new Path(readerConfigFile);
     }
     _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH);
+    _createTarGz = 
Boolean.getBoolean(_jobConf.get(JobConfigConstants.CREATE_TAR_GZ));
 
     // Set up segment name generator
     String segmentNameGeneratorType =
@@ -153,18 +153,16 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     _localStagingDir = new File(LOCAL_TEMP_DIR);
     _localInputDir = new File(_localStagingDir, "inputData");
     _localSegmentDir = new File(_localStagingDir, "segments");
-    _localSegmentTarDir = new File(_localStagingDir, 
JobConfigConstants.SEGMENT_TAR_DIR);
 
     if (_localStagingDir.exists()) {
       _logger.warn("Deleting existing file: {}", _localStagingDir);
       FileUtils.forceDelete(_localStagingDir);
     }
     _logger
-        .info("Making local temporary directories: {}, {}, {}", 
_localStagingDir, _localInputDir, _localSegmentTarDir);
+        .info("Making local temporary directories: {}, {}, {}", 
_localStagingDir, _localInputDir);
     Preconditions.checkState(_localStagingDir.mkdirs());
     Preconditions.checkState(_localInputDir.mkdir());
     Preconditions.checkState(_localSegmentDir.mkdir());
-    Preconditions.checkState(_localSegmentTarDir.mkdir());
 
     
_logger.info("*********************************************************************");
     _logger.info("Raw Table Name: {}", _rawTableName);
@@ -176,7 +174,6 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
     _logger.info("Local Staging Directory: {}", _localStagingDir);
     _logger.info("Local Input Directory: {}", _localInputDir);
-    _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir);
     
_logger.info("*********************************************************************");
   }
 
@@ -233,6 +230,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
       segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
     }
     segmentGeneratorConfig.setOnHeap(true);
+    segmentGeneratorConfig.setCreateTarGz(_createTarGz);
 
     addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, 
hdfsInputFile, sequenceId);
 
@@ -260,16 +258,11 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     String segmentName = driver.getSegmentName();
     _logger.info("Finish creating segment: {} with sequence id: {}", 
segmentName, sequenceId);
 
-    File localSegmentDir = new File(_localSegmentDir, segmentName);
     String segmentTarFileName = segmentName + 
JobConfigConstants.TAR_GZ_FILE_EXT;
-    File localSegmentTarFile = new File(_localSegmentTarDir, 
segmentTarFileName);
-    _logger.info("Tarring segment from: {} to: {}", localSegmentDir, 
localSegmentTarFile);
-    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), 
localSegmentTarFile.getPath());
+    File localSegmentTarFile = new File(_localSegmentDir, segmentTarFileName);
 
-    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
     long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-    _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", 
segmentName,
-        DataSize.fromBytes(uncompressedSegmentSize), 
DataSize.fromBytes(compressedSegmentSize));
+    _logger.info("Size for segment: {}, compressed: {}", segmentName, 
DataSize.fromBytes(compressedSegmentSize));
 
     Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
     if (_useRelativePath) {
diff --git 
a/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java
 
b/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java
index 9c9238e..9cb35b8 100644
--- 
a/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java
+++ 
b/pinot-ingestion-jobs/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/JobConfigConstants.java
@@ -76,4 +76,7 @@ public class JobConfigConstants {
 
   // Assign sequence ids to input files based at each local directory level
   public static final String LOCAL_DIRECTORY_SEQUENCE_ID = 
"local.directory.sequence.id";
+
+  // Creates a tar gz file
+  public static final String CREATE_TAR_GZ = "create.tar.gz";
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 4c99f8c..73dc51c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -48,6 +47,7 @@ import org.apache.pinot.hadoop.job.HadoopSegmentCreationJob;
 import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentTarPushJob;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -174,6 +174,8 @@ public class 
HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, 
getDefaultControllerConfiguration().getControllerHost());
     properties.setProperty(JobConfigConstants.PUSH_TO_PORT, 
getDefaultControllerConfiguration().getControllerPort());
 
+    properties.setProperty(JobConfigConstants.CREATE_TAR_GZ, "true");
+
     Properties preComputeProperties = new Properties();
     preComputeProperties.putAll(properties);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to