This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch preprocess in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ca51acce57eab0a40ba6fbf990499a37abfac6fc Author: Jennifer Dai <[email protected]> AuthorDate: Wed Sep 25 10:12:07 2019 -0700 Enabling alternative controller rest API classes in preprocess --- .../apache/pinot/hadoop/job/BaseSegmentJob.java | 35 ++++++++++++++++++++++ .../pinot/hadoop/job/SegmentCreationJob.java | 17 ----------- .../pinot/hadoop/job/SegmentPreprocessingJob.java | 25 +++++----------- 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java index 488c587..2d72fbb 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java @@ -18,17 +18,21 @@ */ package org.apache.pinot.hadoop.job; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pinot.common.Utils; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.hadoop.utils.PushLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +41,8 @@ public abstract class BaseSegmentJob extends Configured { protected final Logger _logger = LoggerFactory.getLogger(getClass()); protected final Properties _properties; protected final Configuration _conf; + protected final List<PushLocation> _pushLocations; + protected final String _rawTableName; protected BaseSegmentJob(Properties properties) { _properties = properties; @@ -44,6 +50,35 @@ public abstract class BaseSegmentJob extends Configured { setConf(_conf); Utils.logVersions(); logProperties(); + + // Optional push location and table parameters. If set, will use the table config and schema from the push hosts. + String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS); + String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT); + if (pushHostsString != null && pushPortString != null) { + _pushLocations = + PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString)); + } else { + _pushLocations = null; + } + + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + + } + + @Nullable + protected TableConfig getTableConfig() + throws IOException { + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + return controllerRestApi != null ? controllerRestApi.getTableConfig() : null; + } + } + + /** + * Can be overridden to provide custom controller Rest API. + */ + @Nullable + protected ControllerRestApi getControllerRestApi() { + return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null; } protected void logProperties() { diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java index aa68f10..427095f 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; -import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -194,14 +193,6 @@ public class SegmentCreationJob extends BaseSegmentJob { _fileSystem.delete(_stagingDir, true); } - @Nullable - protected TableConfig getTableConfig() - throws IOException { - try (ControllerRestApi controllerRestApi = getControllerRestApi()) { - return controllerRestApi != null ? controllerRestApi.getTableConfig() : null; - } - } - protected Schema getSchema() throws IOException { try (ControllerRestApi controllerRestApi = getControllerRestApi()) { @@ -215,14 +206,6 @@ public class SegmentCreationJob extends BaseSegmentJob { } } - /** - * Can be overridden to provide custom controller Rest API. - */ - @Nullable - protected ControllerRestApi getControllerRestApi() { - return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null; - } - protected void validateTableConfig(TableConfig tableConfig) { SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java index 89946ed..07fd2af 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java @@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job; import com.google.common.base.Preconditions; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -27,7 +28,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.zip.GZIPInputStream; -import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -92,6 +92,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { // Optional. private final Path _pathToDependencyJar; + protected final Path _schemaFile; private TableConfig _tableConfig; private org.apache.pinot.common.data.Schema _pinotTableSchema; @@ -107,7 +108,9 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { _preprocessedOutputDir = getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT); _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + // Optional _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR); + _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA); // Optional push location and table parameters. If set, will use the table config and schema from the push hosts. String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS); @@ -374,29 +377,15 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { fieldSet.add(hashCodeField); } - /** - * Can be overridden to provide custom controller Rest API. - */ - @Nullable - protected ControllerRestApi getControllerRestApi() { - return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null; - } - - @Nullable - protected TableConfig getTableConfig() - throws IOException { - try (ControllerRestApi controllerRestApi = getControllerRestApi()) { - return controllerRestApi != null ? controllerRestApi.getTableConfig() : null; - } - } - protected org.apache.pinot.common.data.Schema getSchema() throws IOException { try (ControllerRestApi controllerRestApi = getControllerRestApi()) { if (controllerRestApi != null) { return controllerRestApi.getSchema(); } else { - throw new RuntimeException("Could not get schema"); + try (InputStream inputStream = _fileSystem.open(_schemaFile)) { + return org.apache.pinot.common.data.Schema.fromInputSteam(inputStream); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
