This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot-batch-ingestion-hadoop
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1eb8dbaa8c10a748ca85fc12330b041d55789312
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jan 15 01:31:18 2020 -0800

    refactor common utils
---
 .../batch/common/SegmentGenerationUtils.java       | 147 +++++++++++++++++++++
 .../spark/SparkSegmentGenerationJobRunner.java     | 135 ++-----------------
 .../standalone/SegmentGenerationJobRunner.java     | 134 ++-----------------
 3 files changed, 166 insertions(+), 250 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtils.java
new file mode 100644
index 0000000..a54a990
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtils.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.common;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+
+
+public class SegmentGenerationUtils {
+
+  private static final String OFFLINE = "OFFLINE";
+
+  public static String generateSchemaURI(String controllerUri, String table) {
+    return String.format("%s/tables/%s/schema", controllerUri, table);
+  }
+
+  public static String generateTableConfigURI(String controllerUri, String 
table) {
+    return String.format("%s/tables/%s", controllerUri, table);
+  }
+
+  public static Schema getSchema(String schemaURIString) {
+    URI schemaURI;
+    try {
+      schemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Schema URI is not valid - '" + 
schemaURIString + "'", e);
+    }
+    String scheme = schemaURI.getScheme();
+    String schemaJson;
+    if (PinotFSFactory.isSchemeSupported(scheme)) {
+      // Try to use PinotFS to read schema URI
+      PinotFS pinotFS = PinotFSFactory.create(scheme);
+      InputStream schemaStream;
+      try {
+        schemaStream = pinotFS.open(schemaURI);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to fetch schema from PinotFS - '" + 
schemaURI + "'", e);
+      }
+      try {
+        schemaJson = IOUtils.toString(schemaStream, StandardCharsets.UTF_8);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to read from schema file data 
stream on Pinot fs - '" + schemaURI + "'", e);
+      }
+    } else {
+      // Try to directly read from URI.
+      try {
+        schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to read from Schema URI - '" + 
schemaURI + "'", e);
+      }
+    }
+    try {
+      return Schema.fromString(schemaJson);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to decode Pinot schema from json 
string - '" + schemaJson + "'", e);
+    }
+  }
+
+  public static TableConfig getTableConfig(String tableConfigURIStr) {
+    URI tableConfigURI;
+    try {
+      tableConfigURI = new URI(tableConfigURIStr);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Table config URI is not valid - '" + 
tableConfigURIStr + "'", e);
+    }
+    String scheme = tableConfigURI.getScheme();
+    String tableConfigJson;
+    if (PinotFSFactory.isSchemeSupported(scheme)) {
+      // Try to use PinotFS to read table config URI
+      PinotFS pinotFS = PinotFSFactory.create(scheme);
+      try {
+        tableConfigJson = IOUtils.toString(pinotFS.open(tableConfigURI), 
StandardCharsets.UTF_8);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to open table config file stream on 
Pinot fs - '" + tableConfigURI + "'", e);
+      }
+    } else {
+      try {
+        tableConfigJson = IOUtils.toString(tableConfigURI, 
StandardCharsets.UTF_8);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Failed to read from table config file data stream on Pinot fs - 
'" + tableConfigURI + "'", e);
+      }
+    }
+    // Controller API returns a wrapper of table config.
+    JsonNode tableJsonNode;
+    try {
+      tableJsonNode = new ObjectMapper().readTree(tableConfigJson);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to decode table config into JSON from 
String - '" + tableConfigJson + "'", e);
+    }
+    if (tableJsonNode.has(OFFLINE)) {
+      tableJsonNode = tableJsonNode.get(OFFLINE);
+    }
+    try {
+      return TableConfig.fromJsonConfig(tableJsonNode);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to decode table config from JSON - '" 
+ tableJsonNode + "'", e);
+    }
+  }
+
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is 
on.
+   * This method will compute the relative path based on `inputFile` and 
`baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = 
/path/to/output/a/b/c
+   */
+  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI 
outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && 
!relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + 
baseInputDir);
+    String outputDirStr = outputDir.toString();
+    outputDir = !outputDirStr.endsWith("/") ? 
URI.create(outputDirStr.concat("/")) : outputDir;
+    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
+    return relativeOutputURI;
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 8a36575..599bb0c 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -18,16 +18,10 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -37,11 +31,9 @@ import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.MapConfiguration;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
@@ -61,7 +53,6 @@ import org.slf4j.LoggerFactory;
 public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, 
Serializable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class);
-  private static final String OFFLINE = "OFFLINE";
   private static final String DEPS_JAR_DIR = "dependencyJarDir";
   private static final String STAGING_DIR = "stagingDir";
 
@@ -74,34 +65,6 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
     init(spec);
   }
 
-  private static String generateSchemaURI(String controllerUri, String table) {
-    return String.format("%s/tables/%s/schema", controllerUri, table);
-  }
-
-  private static String generateTableConfigURI(String controllerUri, String 
table) {
-    return String.format("%s/tables/%s", controllerUri, table);
-  }
-
-  /**
-   * Generate a relative output directory path when `useRelativePath` flag is 
on.
-   * This method will compute the relative path based on `inputFile` and 
`baseInputDir`,
-   * then apply only the directory part of relative path to `outputDir`.
-   * E.g.
-   *    baseInputDir = "/path/to/input"
-   *    inputFile = "/path/to/input/a/b/c/d.avro"
-   *    outputDir = "/path/to/output"
-   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = 
/path/to/output/a/b/c
-   */
-  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI 
outputDir) {
-    URI relativePath = baseInputDir.relativize(inputFile);
-    Preconditions.checkState(relativePath.getPath().length() > 0 && 
!relativePath.equals(inputFile),
-        "Unable to extract out the relative path based on base input path: " + 
baseInputDir);
-    String outputDirStr = outputDir.toString();
-    outputDir = !outputDirStr.endsWith("/") ? 
URI.create(outputDirStr.concat("/")) : outputDir;
-    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
-    return relativeOutputURI;
-  }
-
   @Override
   public void init(SegmentGenerationJobSpec spec) {
     _spec = spec;
@@ -125,7 +88,8 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
         throw new RuntimeException("Missing property 'schemaURI' in 
'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String schemaURI = 
generateSchemaURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
+      String schemaURI = SegmentGenerationUtils
+          .generateSchemaURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
       _spec.getTableSpec().setSchemaURI(schemaURI);
     }
     if (_spec.getTableSpec().getTableConfigURI() == null) {
@@ -133,8 +97,8 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
         throw new RuntimeException("Missing property 'tableConfigURI' in 
'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String tableConfigURI =
-          generateTableConfigURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
+      String tableConfigURI = SegmentGenerationUtils
+          .generateTableConfigURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
       _spec.getTableSpec().setTableConfigURI(tableConfigURI);
     }
     if (_spec.getExecutionFrameworkSpec().getExtraConfigs() == null) {
@@ -252,8 +216,9 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
         taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
         taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
         taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-        taskSpec.setSchema(getSchema());
-        taskSpec.setTableConfig(getTableConfig().toJsonNode());
+        
taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
+        taskSpec.setTableConfig(
+            
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
         taskSpec.setSequenceId(idx);
         
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
@@ -272,7 +237,8 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
             DataSize.fromBytes(uncompressedSegmentSize), 
DataSize.fromBytes(compressedSegmentSize));
         //move segment to output PinotFS
         URI outputSegmentTarURI =
-            getRelativeOutputPath(finalInputDirURI, inputFileURI, 
finalOutputDirURI).resolve(segmentTarFileName);
+            SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI)
+                .resolve(segmentTarFileName);
         LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
         if (!_spec.isOverwriteOutput() && 
PinotFSFactory.create(outputSegmentTarURI.getScheme())
             .exists(outputSegmentTarURI)) {
@@ -297,87 +263,6 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
     }
   }
 
-  private Schema getSchema() {
-    URI schemaURI;
-    try {
-      schemaURI = new URI(_spec.getTableSpec().getSchemaURI());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Schema URI is not valid - '" + 
_spec.getTableSpec().getSchemaURI() + "'", e);
-    }
-    String scheme = schemaURI.getScheme();
-    String schemaJson;
-    if (PinotFSFactory.isSchemeSupported(scheme)) {
-      // Try to use PinotFS to read schema URI
-      PinotFS pinotFS = PinotFSFactory.create(scheme);
-      InputStream schemaStream;
-      try {
-        schemaStream = pinotFS.open(schemaURI);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to fetch schema from PinotFS - '" + 
schemaURI + "'", e);
-      }
-      try {
-        schemaJson = IOUtils.toString(schemaStream, StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to read from schema file data 
stream on Pinot fs - '" + schemaURI + "'", e);
-      }
-    } else {
-      // Try to directly read from URI.
-      try {
-        schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to read from Schema URI - '" + 
schemaURI + "'", e);
-      }
-    }
-    try {
-      return Schema.fromString(schemaJson);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to decode Pinot schema from json 
string - '" + schemaJson + "'", e);
-    }
-  }
-
-  private TableConfig getTableConfig() {
-    URI tableConfigURI;
-    try {
-      tableConfigURI = new URI(_spec.getTableSpec().getTableConfigURI());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Table config URI is not valid - '" + 
_spec.getTableSpec().getTableConfigURI() + "'",
-          e);
-    }
-    String scheme = tableConfigURI.getScheme();
-    String tableConfigJson;
-    if (PinotFSFactory.isSchemeSupported(scheme)) {
-      // Try to use PinotFS to read table config URI
-      PinotFS pinotFS = PinotFSFactory.create(scheme);
-      try {
-        tableConfigJson = IOUtils.toString(pinotFS.open(tableConfigURI), 
StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to open table config file stream on 
Pinot fs - '" + tableConfigURI + "'", e);
-      }
-    } else {
-      try {
-        tableConfigJson = IOUtils.toString(tableConfigURI, 
StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "Failed to read from table config file data stream on Pinot fs - 
'" + tableConfigURI + "'", e);
-      }
-    }
-    // Controller API returns a wrapper of table config.
-    JsonNode tableJsonNode;
-    try {
-      tableJsonNode = new ObjectMapper().readTree(tableConfigJson);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to decode table config into JSON from 
String - '" + tableConfigJson + "'", e);
-    }
-    if (tableJsonNode.has(OFFLINE)) {
-      tableJsonNode = tableJsonNode.get(OFFLINE);
-    }
-    try {
-      return TableConfig.fromJsonConfig(tableJsonNode);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to decode table config from JSON - '" 
+ tableJsonNode + "'", e);
-    }
-  }
-
   protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, 
String depsJarDir)
       throws IOException {
     if (depsJarDir != null) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index d9b6c32..b98a661 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -18,15 +18,8 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.standalone;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -35,10 +28,10 @@ import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.MapConfiguration;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -56,7 +49,6 @@ import org.slf4j.LoggerFactory;
 public class SegmentGenerationJobRunner implements IngestionJobRunner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
-  private static final String OFFLINE = "OFFLINE";
 
   private SegmentGenerationJobSpec _spec;
 
@@ -67,34 +59,6 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
     init(spec);
   }
 
-  private static String generateSchemaURI(String controllerUri, String table) {
-    return String.format("%s/tables/%s/schema", controllerUri, table);
-  }
-
-  private static String generateTableConfigURI(String controllerUri, String 
table) {
-    return String.format("%s/tables/%s", controllerUri, table);
-  }
-
-  /**
-   * Generate a relative output directory path when `useRelativePath` flag is 
on.
-   * This method will compute the relative path based on `inputFile` and 
`baseInputDir`,
-   * then apply only the directory part of relative path to `outputDir`.
-   * E.g.
-   *    baseInputDir = "/path/to/input"
-   *    inputFile = "/path/to/input/a/b/c/d.avro"
-   *    outputDir = "/path/to/output"
-   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = 
/path/to/output/a/b/c
-   */
-  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI 
outputDir) {
-    URI relativePath = baseInputDir.relativize(inputFile);
-    Preconditions.checkState(relativePath.getPath().length() > 0 && 
!relativePath.equals(inputFile),
-        "Unable to extract out the relative path based on base input path: " + 
baseInputDir);
-    String outputDirStr = outputDir.toString();
-    outputDir = !outputDirStr.endsWith("/") ? 
URI.create(outputDirStr.concat("/")) : outputDir;
-    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
-    return relativeOutputURI;
-  }
-
   @Override
   public void init(SegmentGenerationJobSpec spec) {
     _spec = spec;
@@ -118,7 +82,8 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
         throw new RuntimeException("Missing property 'schemaURI' in 
'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String schemaURI = 
generateSchemaURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
+      String schemaURI = SegmentGenerationUtils
+          .generateSchemaURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
       _spec.getTableSpec().setSchemaURI(schemaURI);
     }
     if (_spec.getTableSpec().getTableConfigURI() == null) {
@@ -126,8 +91,8 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
         throw new RuntimeException("Missing property 'tableConfigURI' in 
'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String tableConfigURI =
-          generateTableConfigURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
+      String tableConfigURI = SegmentGenerationUtils
+          .generateTableConfigURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
       _spec.getTableSpec().setTableConfigURI(tableConfigURI);
     }
   }
@@ -196,8 +161,8 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
       FileUtils.forceMkdir(localOutputTempDir);
 
       //Read TableConfig, Schema
-      Schema schema = getSchema();
-      TableConfig tableConfig = getTableConfig();
+      Schema schema = 
SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
+      TableConfig tableConfig = 
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
 
       //iterate on the file list, for each
       for (int i = 0; i < filteredFiles.size(); i++) {
@@ -235,8 +200,8 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
         LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", 
segmentName,
             DataSize.fromBytes(uncompressedSegmentSize), 
DataSize.fromBytes(compressedSegmentSize));
         //move segment to output PinotFS
-        URI outputSegmentTarURI =
-            getRelativeOutputPath(inputDirURI, inputFileURI, 
outputDirURI).resolve(segmentTarFileName);
+        URI outputSegmentTarURI = 
SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, 
outputDirURI)
+            .resolve(segmentTarFileName);
         if (!_spec.isOverwriteOutput() && 
outputDirFS.exists(outputSegmentTarURI)) {
           LOGGER.warn("Not overwrite existing output segment tar file: {}", 
outputDirFS.exists(outputSegmentTarURI));
         } else {
@@ -251,85 +216,4 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
       FileUtils.deleteDirectory(localTempDir);
     }
   }
-
-  private Schema getSchema() {
-    URI schemaURI;
-    try {
-      schemaURI = new URI(_spec.getTableSpec().getSchemaURI());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Schema URI is not valid - '" + 
_spec.getTableSpec().getSchemaURI() + "'", e);
-    }
-    String scheme = schemaURI.getScheme();
-    String schemaJson;
-    if (PinotFSFactory.isSchemeSupported(scheme)) {
-      // Try to use PinotFS to read schema URI
-      PinotFS pinotFS = PinotFSFactory.create(scheme);
-      InputStream schemaStream;
-      try {
-        schemaStream = pinotFS.open(schemaURI);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to fetch schema from PinotFS - '" + 
schemaURI + "'", e);
-      }
-      try {
-        schemaJson = IOUtils.toString(schemaStream, StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to read from schema file data 
stream on Pinot fs - '" + schemaURI + "'", e);
-      }
-    } else {
-      // Try to directly read from URI.
-      try {
-        schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to read from Schema URI - '" + 
schemaURI + "'", e);
-      }
-    }
-    try {
-      return Schema.fromString(schemaJson);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to decode Pinot schema from json 
string - '" + schemaJson + "'", e);
-    }
-  }
-
-  private TableConfig getTableConfig() {
-    URI tableConfigURI;
-    try {
-      tableConfigURI = new URI(_spec.getTableSpec().getTableConfigURI());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Table config URI is not valid - '" + 
_spec.getTableSpec().getTableConfigURI() + "'",
-          e);
-    }
-    String scheme = tableConfigURI.getScheme();
-    String tableConfigJson;
-    if (PinotFSFactory.isSchemeSupported(scheme)) {
-      // Try to use PinotFS to read table config URI
-      PinotFS pinotFS = PinotFSFactory.create(scheme);
-      try {
-        tableConfigJson = IOUtils.toString(pinotFS.open(tableConfigURI), 
StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to open table config file stream on 
Pinot fs - '" + tableConfigURI + "'", e);
-      }
-    } else {
-      try {
-        tableConfigJson = IOUtils.toString(tableConfigURI, 
StandardCharsets.UTF_8);
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "Failed to read from table config file data stream on Pinot fs - 
'" + tableConfigURI + "'", e);
-      }
-    }
-    // Controller API returns a wrapper of table config.
-    JsonNode tableJsonNode;
-    try {
-      tableJsonNode = new ObjectMapper().readTree(tableConfigJson);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to decode table config into JSON from 
String - '" + tableConfigJson + "'", e);
-    }
-    if (tableJsonNode.has(OFFLINE)) {
-      tableJsonNode = tableJsonNode.get(OFFLINE);
-    }
-    try {
-      return TableConfig.fromJsonConfig(tableJsonNode);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to decode table config from JSON - '" 
+ tableJsonNode + "'", e);
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to