This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch preprocess in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7a1a5aaaba3b238d343123e7694cf0f7667b2165 Author: Jennifer Dai <[email protected]> AuthorDate: Wed Sep 25 10:14:40 2019 -0700 Enabling alternative controller rest API classes in preprocess --- .../apache/pinot/hadoop/job/BaseSegmentJob.java | 35 +++++++++++++++++++ .../pinot/hadoop/job/SegmentCreationJob.java | 17 --------- .../pinot/hadoop/job/SegmentPreprocessingJob.java | 40 ++++++++++++++++++---- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java index 488c587..2d72fbb 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java @@ -18,17 +18,21 @@ */ package org.apache.pinot.hadoop.job; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pinot.common.Utils; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.hadoop.utils.PushLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +41,8 @@ public abstract class BaseSegmentJob extends Configured { protected final Logger _logger = LoggerFactory.getLogger(getClass()); protected final Properties _properties; protected final Configuration _conf; + protected final List<PushLocation> _pushLocations; + protected final String _rawTableName; protected BaseSegmentJob(Properties properties) { _properties = properties; @@ -44,6 +50,35 @@ public abstract class BaseSegmentJob extends Configured { setConf(_conf); Utils.logVersions(); logProperties(); + + // Optional push location and table parameters. If set, will use the table config and schema from the push hosts. + String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS); + String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT); + if (pushHostsString != null && pushPortString != null) { + _pushLocations = + PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString)); + } else { + _pushLocations = null; + } + + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + + } + + @Nullable + protected TableConfig getTableConfig() + throws IOException { + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + return controllerRestApi != null ? controllerRestApi.getTableConfig() : null; + } + } + + /** + * Can be overridden to provide custom controller Rest API. + */ + @Nullable + protected ControllerRestApi getControllerRestApi() { + return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null; } protected void logProperties() { diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java index aa68f10..427095f 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; -import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -194,14 +193,6 @@ public class SegmentCreationJob extends BaseSegmentJob { _fileSystem.delete(_stagingDir, true); } - @Nullable - protected TableConfig getTableConfig() - throws IOException { - try (ControllerRestApi controllerRestApi = getControllerRestApi()) { - return controllerRestApi != null ? controllerRestApi.getTableConfig() : null; - } - } - protected Schema getSchema() throws IOException { try (ControllerRestApi controllerRestApi = getControllerRestApi()) { @@ -215,14 +206,6 @@ public class SegmentCreationJob extends BaseSegmentJob { } } - /** - * Can be overridden to provide custom controller Rest API. - */ - @Nullable - protected ControllerRestApi getControllerRestApi() { - return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null; - } - protected void validateTableConfig(TableConfig tableConfig) { SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java index 942fb6d..07fd2af 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java @@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job; import com.google.common.base.Preconditions; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -87,10 +88,11 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { private final Path _inputSegmentDir; private final Path _preprocessedOutputDir; protected final String _rawTableName; - protected final List<PushLocation> _pushLocations; + protected List<PushLocation> _pushLocations; // Optional. private final Path _pathToDependencyJar; + protected final Path _schemaFile; private TableConfig _tableConfig; private org.apache.pinot.common.data.Schema _pinotTableSchema; @@ -106,7 +108,9 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { _preprocessedOutputDir = getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT); _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + // Optional _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR); + _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA); // Optional push location and table parameters. If set, will use the table config and schema from the push hosts. String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS); @@ -115,9 +119,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { _pushLocations = PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString)); } else { - throw new RuntimeException(String - .format("Push location is mis-configured! %s: %s, %s: %s", JobConfigConstants.PUSH_TO_HOSTS, pushHostsString, - JobConfigConstants.PUSH_TO_PORT, pushPortString)); + _pushLocations = null; } _logger.info("*********************************************************************"); @@ -375,13 +377,37 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { fieldSet.add(hashCodeField); } + protected org.apache.pinot.common.data.Schema getSchema() + throws IOException { + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + if (controllerRestApi != null) { + return controllerRestApi.getSchema(); + } else { + try (InputStream inputStream = _fileSystem.open(_schemaFile)) { + return org.apache.pinot.common.data.Schema.fromInputSteam(inputStream); + } + } + } + } + + /** + * Can be overridden to set additional job properties. + */ + @SuppressWarnings("unused") + protected void addAdditionalJobProperties(Job job) { + } + private void setTableConfigAndSchema() throws IOException { // If push locations, table config, and schema are not configured, this does not necessarily mean that segments // cannot be created. We should allow the user to go to the next step rather than failing the job. Preconditions.checkState(!_pushLocations.isEmpty(), "Push locations cannot be empty."); - try(ControllerRestApi controllerRestApi = new DefaultControllerRestApi(_pushLocations, _rawTableName)) { - _tableConfig = controllerRestApi.getTableConfig(); - _pinotTableSchema = controllerRestApi.getSchema(); + try(ControllerRestApi controllerRestApi = getControllerRestApi()) { + if (controllerRestApi != null) { + _tableConfig = controllerRestApi.getTableConfig(); + _pinotTableSchema = controllerRestApi.getSchema(); + } else { + throw new RuntimeException("Controller REST API not initialized"); + } } Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
