jackjlli commented on a change in pull request #4462: Time-aware resizing
URL: https://github.com/apache/incubator-pinot/pull/4462#discussion_r306520054
##########
File path:
pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
##########
@@ -137,8 +162,195 @@ public SegmentPreprocessingJob(final Properties
properties) {
public void run()
throws Exception {
- _logger.info("Pre-processing job is disabled.");
- return;
+ // TODO: Remove once the job is ready
+ _enablePartitioning = false;
+ _enableSorting = false;
+ _enableResizing = false;
+
+ if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+ _logger.info("Pre-processing job is disabled.");
+ return;
+ } else {
+ _logger.info("Starting {}", getClass().getSimpleName());
+ }
+
+ _fileSystem = FileSystem.get(_conf);
+ final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+ if (_fileSystem.exists(_preprocessedOutputDir)) {
+ _logger.warn("Found the output folder {}, deleting it",
_preprocessedOutputDir);
+ _fileSystem.delete(_preprocessedOutputDir, true);
+ }
+ JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir,
_defaultPermissionsMask);
+
+ // If push locations, table config, and schema are not configured, this
does not necessarily mean that segments
+ // cannot be created. We should allow the user to go to the next step
rather than failing the job.
+ if (_pushLocations.isEmpty()) {
+ _logger.error("Push locations cannot be empty. "
+ + "They are needed to get the table config and schema needed for
this step. Skipping pre-processing");
+ return;
+ }
+
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ _tableConfig = controllerRestApi.getTableConfig();
+ _schema = controllerRestApi.getSchema();
+ }
+
+ if (_tableConfig == null) {
+ _logger.error("Table config cannot be null. Skipping pre-processing");
+ return;
+ }
+
+ if (_schema == null) {
+ _logger.error("Schema cannot be null. Skipping pre-processing");
+ }
+
+ SegmentsValidationAndRetentionConfig validationConfig =
_tableConfig.getValidationConfig();
+
+ _logger.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(_conf);
+
+ // TODO: Serialize and deserialize validation config by creating toJson
and fromJson
+ // If the use case is an append use case, check that one time unit is
contained in one file. If there is more than one,
+ // the job should be disabled, as we should not resize for these use
cases. Therefore, setting the time column name
+ // and value
+ if (validationConfig.getSegmentPushType().equalsIgnoreCase("APPEND")) {
+ job.getConfiguration().set(IS_APPEND, "true");
+ String timeColumnName = _schema.getTimeFieldSpec().getName();
+ job.getConfiguration().set(TIME_COLUMN_CONFIG, timeColumnName);
+ job.getConfiguration().set(SEGMENT_TIME_TYPE,
validationConfig.getTimeType().toString());
+ job.getConfiguration().set(SEGMENT_TIME_FORMAT,
_schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat());
+ job.getConfiguration().set(SEGMENT_PUSH_FREQUENCY,
validationConfig.getSegmentPushFrequency());
+ DataFileStream<GenericRecord> dataStreamReader =
getAvroReader(inputDataPath.get(0));
+ job.getConfiguration().set(TIME_COLUMN_VALUE, (String)
dataStreamReader.next().get(timeColumnName));
+ dataStreamReader.close();
+ }
+
+ if (_enablePartitioning) {
+ fetchPartitioningConfig();
+ _logger.info("{}: {}", PARTITION_COLUMN_CONFIG, _partitionColumn);
+ _logger.info("{}: {}", NUM_PARTITIONS_CONFIG, _numberOfPartitions);
+ _logger.info("{}: {}", PARTITION_FUNCTION_CONFIG, _partitionColumn);
+ }
+
+ if (_enableSorting) {
+ fetchSortingConfig();
+ _logger.info("{}: {}", SORTED_COLUMN_CONFIG, _sortedColumn);
+ }
+
+ if (_enableResizing) {
+ fetchResizingConfig();
+ _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+ }
+
+ job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+ // Turn this on to always firstly use class paths that user specifies.
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST,
"true");
+ // Turn this off since we don't need an empty file in the output directory
+
job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
"false");
+
+ job.setJarByClass(SegmentPreprocessingJob.class);
+
+ String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ if (hadoopTokenFileLocation != null) {
+ job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
hadoopTokenFileLocation);
+ }
+
+ // Schema configs.
Review comment:
Adjust the comment to `// Arvo schema configs`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]