This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch preprocess
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ca51acce57eab0a40ba6fbf990499a37abfac6fc
Author: Jennifer Dai <[email protected]>
AuthorDate: Wed Sep 25 10:12:07 2019 -0700

    Enabling alternative controller rest API classes in preprocess
---
 .../apache/pinot/hadoop/job/BaseSegmentJob.java    | 35 ++++++++++++++++++++++
 .../pinot/hadoop/job/SegmentCreationJob.java       | 17 -----------
 .../pinot/hadoop/job/SegmentPreprocessingJob.java  | 25 +++++-----------
 3 files changed, 42 insertions(+), 35 deletions(-)

diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
index 488c587..2d72fbb 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pinot.hadoop.job;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.hadoop.utils.PushLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +41,8 @@ public abstract class BaseSegmentJob extends Configured {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
   protected final Properties _properties;
   protected final Configuration _conf;
+  protected final List<PushLocation> _pushLocations;
+  protected final String _rawTableName;
 
   protected BaseSegmentJob(Properties properties) {
     _properties = properties;
@@ -44,6 +50,35 @@ public abstract class BaseSegmentJob extends Configured {
     setConf(_conf);
     Utils.logVersions();
     logProperties();
+
+    // Optional push location and table parameters. If set, will use the table 
config and schema from the push hosts.
+    String pushHostsString = 
_properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = 
_properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, 
','), Integer.parseInt(pushPortString));
+    } else {
+      _pushLocations = null;
+    }
+
+    _rawTableName = 
Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+
+  }
+
+  @Nullable
+  protected TableConfig getTableConfig()
+      throws IOException {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      return controllerRestApi != null ? controllerRestApi.getTableConfig() : 
null;
+    }
+  }
+
+  /**
+   * Can be overridden to provide custom controller Rest API.
+   */
+  @Nullable
+  protected ControllerRestApi getControllerRestApi() {
+    return _pushLocations != null ? new 
DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
   }
 
   protected void logProperties() {
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index aa68f10..427095f 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -194,14 +193,6 @@ public class SegmentCreationJob extends BaseSegmentJob {
     _fileSystem.delete(_stagingDir, true);
   }
 
-  @Nullable
-  protected TableConfig getTableConfig()
-      throws IOException {
-    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
-      return controllerRestApi != null ? controllerRestApi.getTableConfig() : 
null;
-    }
-  }
-
   protected Schema getSchema()
       throws IOException {
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
@@ -215,14 +206,6 @@ public class SegmentCreationJob extends BaseSegmentJob {
     }
   }
 
-  /**
-   * Can be overridden to provide custom controller Rest API.
-   */
-  @Nullable
-  protected ControllerRestApi getControllerRestApi() {
-    return _pushLocations != null ? new 
DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
-  }
-
   protected void validateTableConfig(TableConfig tableConfig) {
     SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
 
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index 89946ed..07fd2af 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -27,7 +28,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.zip.GZIPInputStream;
-import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
@@ -92,6 +92,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
 
   // Optional.
   private final Path _pathToDependencyJar;
+  protected final Path _schemaFile;
 
   private TableConfig _tableConfig;
   private org.apache.pinot.common.data.Schema _pinotTableSchema;
@@ -107,7 +108,9 @@ public class SegmentPreprocessingJob extends BaseSegmentJob 
{
     _preprocessedOutputDir = 
getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT);
     _rawTableName = 
Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
 
+    // Optional
     _pathToDependencyJar = 
getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+    _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
 
     // Optional push location and table parameters. If set, will use the table 
config and schema from the push hosts.
     String pushHostsString = 
_properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
@@ -374,29 +377,15 @@ public class SegmentPreprocessingJob extends 
BaseSegmentJob {
     fieldSet.add(hashCodeField);
   }
 
-  /**
-   * Can be overridden to provide custom controller Rest API.
-   */
-  @Nullable
-  protected ControllerRestApi getControllerRestApi() {
-    return _pushLocations != null ? new 
DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
-  }
-
-  @Nullable
-  protected TableConfig getTableConfig()
-      throws IOException {
-    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
-      return controllerRestApi != null ? controllerRestApi.getTableConfig() : 
null;
-    }
-  }
-
   protected org.apache.pinot.common.data.Schema getSchema()
       throws IOException {
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
       if (controllerRestApi != null) {
         return controllerRestApi.getSchema();
       } else {
-        throw new RuntimeException("Could not get schema");
+        try (InputStream inputStream = _fileSystem.open(_schemaFile)) {
+          return 
org.apache.pinot.common.data.Schema.fromInputSteam(inputStream);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to