This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch pinot-batch-ingestion-hadoop in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1eb8dbaa8c10a748ca85fc12330b041d55789312 Author: Xiang Fu <[email protected]> AuthorDate: Wed Jan 15 01:31:18 2020 -0800 refactor common utils --- .../batch/common/SegmentGenerationUtils.java | 147 +++++++++++++++++++++ .../spark/SparkSegmentGenerationJobRunner.java | 135 ++----------------- .../standalone/SegmentGenerationJobRunner.java | 134 ++----------------- 3 files changed, 166 insertions(+), 250 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtils.java new file mode 100644 index 0000000..a54a990 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtils.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.ingestion.batch.common; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import org.apache.commons.io.IOUtils; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; + + +public class SegmentGenerationUtils { + + private static final String OFFLINE = "OFFLINE"; + + public static String generateSchemaURI(String controllerUri, String table) { + return String.format("%s/tables/%s/schema", controllerUri, table); + } + + public static String generateTableConfigURI(String controllerUri, String table) { + return String.format("%s/tables/%s", controllerUri, table); + } + + public static Schema getSchema(String schemaURIString) { + URI schemaURI; + try { + schemaURI = new URI(schemaURIString); + } catch (URISyntaxException e) { + throw new RuntimeException("Schema URI is not valid - '" + schemaURIString + "'", e); + } + String scheme = schemaURI.getScheme(); + String schemaJson; + if (PinotFSFactory.isSchemeSupported(scheme)) { + // Try to use PinotFS to read schema URI + PinotFS pinotFS = PinotFSFactory.create(scheme); + InputStream schemaStream; + try { + schemaStream = pinotFS.open(schemaURI); + } catch (IOException e) { + throw new RuntimeException("Failed to fetch schema from PinotFS - '" + schemaURI + "'", e); + } + try { + schemaJson = IOUtils.toString(schemaStream, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to read from schema file data stream on Pinot fs - '" + schemaURI + "'", e); + } + } else { + // Try to directly read from URI. + try { + schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e); + } + } + try { + return Schema.fromString(schemaJson); + } catch (IOException e) { + throw new RuntimeException("Failed to decode Pinot schema from json string - '" + schemaJson + "'", e); + } + } + + public static TableConfig getTableConfig(String tableConfigURIStr) { + URI tableConfigURI; + try { + tableConfigURI = new URI(tableConfigURIStr); + } catch (URISyntaxException e) { + throw new RuntimeException("Table config URI is not valid - '" + tableConfigURIStr + "'", e); + } + String scheme = tableConfigURI.getScheme(); + String tableConfigJson; + if (PinotFSFactory.isSchemeSupported(scheme)) { + // Try to use PinotFS to read table config URI + PinotFS pinotFS = PinotFSFactory.create(scheme); + try { + tableConfigJson = IOUtils.toString(pinotFS.open(tableConfigURI), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to open table config file stream on Pinot fs - '" + tableConfigURI + "'", e); + } + } else { + try { + tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException( + "Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e); + } + } + // Controller API returns a wrapper of table config. + JsonNode tableJsonNode; + try { + tableJsonNode = new ObjectMapper().readTree(tableConfigJson); + } catch (IOException e) { + throw new RuntimeException("Failed to decode table config into JSON from String - '" + tableConfigJson + "'", e); + } + if (tableJsonNode.has(OFFLINE)) { + tableJsonNode = tableJsonNode.get(OFFLINE); + } + try { + return TableConfig.fromJsonConfig(tableJsonNode); + } catch (IOException e) { + throw new RuntimeException("Failed to decode table config from JSON - '" + tableJsonNode + "'", e); + } + } + + /** + * Generate a relative output directory path when `useRelativePath` flag is on. + * This method will compute the relative path based on `inputFile` and `baseInputDir`, + * then apply only the directory part of relative path to `outputDir`. + * E.g. + * baseInputDir = "/path/to/input" + * inputFile = "/path/to/input/a/b/c/d.avro" + * outputDir = "/path/to/output" + * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c + */ + public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { + URI relativePath = baseInputDir.relativize(inputFile); + Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), + "Unable to extract out the relative path based on base input path: " + baseInputDir); + String outputDirStr = outputDir.toString(); + outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; + URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); + return relativeOutputURI; + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index 8a36575..599bb0c 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -18,16 +18,10 @@ */ package org.apache.pinot.plugin.ingestion.batch.spark; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.Serializable; import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; @@ -37,11 +31,9 @@ import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; -import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; @@ -61,7 +53,6 @@ import org.slf4j.LoggerFactory; public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class); - private static final String OFFLINE = "OFFLINE"; private static final String DEPS_JAR_DIR = "dependencyJarDir"; private static final String STAGING_DIR = "stagingDir"; @@ -74,34 +65,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri init(spec); } - private static String generateSchemaURI(String controllerUri, String table) { - return String.format("%s/tables/%s/schema", controllerUri, table); - } - - private static String generateTableConfigURI(String controllerUri, String table) { - return String.format("%s/tables/%s", controllerUri, table); - } - - /** - * Generate a relative output directory path when `useRelativePath` flag is on. - * This method will compute the relative path based on `inputFile` and `baseInputDir`, - * then apply only the directory part of relative path to `outputDir`. - * E.g. - * baseInputDir = "/path/to/input" - * inputFile = "/path/to/input/a/b/c/d.avro" - * outputDir = "/path/to/output" - * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c - */ - public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { - URI relativePath = baseInputDir.relativize(inputFile); - Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), - "Unable to extract out the relative path based on base input path: " + baseInputDir); - String outputDirStr = outputDir.toString(); - outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; - URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); - return relativeOutputURI; - } - @Override public void init(SegmentGenerationJobSpec spec) { _spec = spec; @@ -125,7 +88,8 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'"); } PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; - String schemaURI = generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); + String schemaURI = SegmentGenerationUtils + .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); _spec.getTableSpec().setSchemaURI(schemaURI); } if (_spec.getTableSpec().getTableConfigURI() == null) { @@ -133,8 +97,8 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'"); } PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; - String tableConfigURI = - generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); + String tableConfigURI = SegmentGenerationUtils + .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); _spec.getTableSpec().setTableConfigURI(tableConfigURI); } if (_spec.getExecutionFrameworkSpec().getExtraConfigs() == null) { @@ -252,8 +216,9 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); - taskSpec.setSchema(getSchema()); - taskSpec.setTableConfig(getTableConfig().toJsonNode()); + taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); + taskSpec.setTableConfig( + SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); @@ -272,7 +237,8 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)); //move segment to output PinotFS URI outputSegmentTarURI = - getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI).resolve(segmentTarFileName); + SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI) + .resolve(segmentTarFileName); LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme()) .exists(outputSegmentTarURI)) { @@ -297,87 +263,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } } - private Schema getSchema() { - URI schemaURI; - try { - schemaURI = new URI(_spec.getTableSpec().getSchemaURI()); - } catch (URISyntaxException e) { - throw new RuntimeException("Schema URI is not valid - '" + _spec.getTableSpec().getSchemaURI() + "'", e); - } - String scheme = schemaURI.getScheme(); - String schemaJson; - if (PinotFSFactory.isSchemeSupported(scheme)) { - // Try to use PinotFS to read schema URI - PinotFS pinotFS = PinotFSFactory.create(scheme); - InputStream schemaStream; - try { - schemaStream = pinotFS.open(schemaURI); - } catch (IOException e) { - throw new RuntimeException("Failed to fetch schema from PinotFS - '" + schemaURI + "'", e); - } - try { - schemaJson = IOUtils.toString(schemaStream, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Failed to read from schema file data stream on Pinot fs - '" + schemaURI + "'", e); - } - } else { - // Try to directly read from URI. - try { - schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e); - } - } - try { - return Schema.fromString(schemaJson); - } catch (IOException e) { - throw new RuntimeException("Failed to decode Pinot schema from json string - '" + schemaJson + "'", e); - } - } - - private TableConfig getTableConfig() { - URI tableConfigURI; - try { - tableConfigURI = new URI(_spec.getTableSpec().getTableConfigURI()); - } catch (URISyntaxException e) { - throw new RuntimeException("Table config URI is not valid - '" + _spec.getTableSpec().getTableConfigURI() + "'", - e); - } - String scheme = tableConfigURI.getScheme(); - String tableConfigJson; - if (PinotFSFactory.isSchemeSupported(scheme)) { - // Try to use PinotFS to read table config URI - PinotFS pinotFS = PinotFSFactory.create(scheme); - try { - tableConfigJson = IOUtils.toString(pinotFS.open(tableConfigURI), StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Failed to open table config file stream on Pinot fs - '" + tableConfigURI + "'", e); - } - } else { - try { - tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException( - "Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e); - } - } - // Controller API returns a wrapper of table config. - JsonNode tableJsonNode; - try { - tableJsonNode = new ObjectMapper().readTree(tableConfigJson); - } catch (IOException e) { - throw new RuntimeException("Failed to decode table config into JSON from String - '" + tableConfigJson + "'", e); - } - if (tableJsonNode.has(OFFLINE)) { - tableJsonNode = tableJsonNode.get(OFFLINE); - } - try { - return TableConfig.fromJsonConfig(tableJsonNode); - } catch (IOException e) { - throw new RuntimeException("Failed to decode table config from JSON - '" + tableJsonNode + "'", e); - } - } - protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir) throws IOException { if (depsJarDir != null) { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index d9b6c32..b98a661 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -18,15 +18,8 @@ */ package org.apache.pinot.plugin.ingestion.batch.standalone; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import java.io.File; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; @@ -35,10 +28,10 @@ import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -56,7 +49,6 @@ import org.slf4j.LoggerFactory; public class SegmentGenerationJobRunner implements IngestionJobRunner { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class); - private static final String OFFLINE = "OFFLINE"; private SegmentGenerationJobSpec _spec; @@ -67,34 +59,6 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { init(spec); } - private static String generateSchemaURI(String controllerUri, String table) { - return String.format("%s/tables/%s/schema", controllerUri, table); - } - - private static String generateTableConfigURI(String controllerUri, String table) { - return String.format("%s/tables/%s", controllerUri, table); - } - - /** - * Generate a relative output directory path when `useRelativePath` flag is on. - * This method will compute the relative path based on `inputFile` and `baseInputDir`, - * then apply only the directory part of relative path to `outputDir`. - * E.g. - * baseInputDir = "/path/to/input" - * inputFile = "/path/to/input/a/b/c/d.avro" - * outputDir = "/path/to/output" - * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c - */ - public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { - URI relativePath = baseInputDir.relativize(inputFile); - Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), - "Unable to extract out the relative path based on base input path: " + baseInputDir); - String outputDirStr = outputDir.toString(); - outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; - URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); - return relativeOutputURI; - } - @Override public void init(SegmentGenerationJobSpec spec) { _spec = spec; @@ -118,7 +82,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'"); } PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; - String schemaURI = generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); + String schemaURI = SegmentGenerationUtils + .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); _spec.getTableSpec().setSchemaURI(schemaURI); } if (_spec.getTableSpec().getTableConfigURI() == null) { @@ -126,8 +91,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'"); } PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; - String tableConfigURI = - generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); + String tableConfigURI = SegmentGenerationUtils + .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); _spec.getTableSpec().setTableConfigURI(tableConfigURI); } } @@ -196,8 +161,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { FileUtils.forceMkdir(localOutputTempDir); //Read TableConfig, Schema - Schema schema = getSchema(); - TableConfig tableConfig = getTableConfig(); + Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()); + TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()); //iterate on the file list, for each for (int i = 0; i < filteredFiles.size(); i++) { @@ -235,8 +200,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)); //move segment to output PinotFS - URI outputSegmentTarURI = - getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI).resolve(segmentTarFileName); + URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) + .resolve(segmentTarFileName); if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) { LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI)); } else { @@ -251,85 +216,4 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { FileUtils.deleteDirectory(localTempDir); } } - - private Schema getSchema() { - URI schemaURI; - try { - schemaURI = new URI(_spec.getTableSpec().getSchemaURI()); - } catch (URISyntaxException e) { - throw new RuntimeException("Schema URI is not valid - '" + _spec.getTableSpec().getSchemaURI() + "'", e); - } - String scheme = schemaURI.getScheme(); - String schemaJson; - if (PinotFSFactory.isSchemeSupported(scheme)) { - // Try to use PinotFS to read schema URI - PinotFS pinotFS = PinotFSFactory.create(scheme); - InputStream schemaStream; - try { - schemaStream = pinotFS.open(schemaURI); - } catch (IOException e) { - throw new RuntimeException("Failed to fetch schema from PinotFS - '" + schemaURI + "'", e); - } - try { - schemaJson = IOUtils.toString(schemaStream, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Failed to read from schema file data stream on Pinot fs - '" + schemaURI + "'", e); - } - } else { - // Try to directly read from URI. - try { - schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e); - } - } - try { - return Schema.fromString(schemaJson); - } catch (IOException e) { - throw new RuntimeException("Failed to decode Pinot schema from json string - '" + schemaJson + "'", e); - } - } - - private TableConfig getTableConfig() { - URI tableConfigURI; - try { - tableConfigURI = new URI(_spec.getTableSpec().getTableConfigURI()); - } catch (URISyntaxException e) { - throw new RuntimeException("Table config URI is not valid - '" + _spec.getTableSpec().getTableConfigURI() + "'", - e); - } - String scheme = tableConfigURI.getScheme(); - String tableConfigJson; - if (PinotFSFactory.isSchemeSupported(scheme)) { - // Try to use PinotFS to read table config URI - PinotFS pinotFS = PinotFSFactory.create(scheme); - try { - tableConfigJson = IOUtils.toString(pinotFS.open(tableConfigURI), StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Failed to open table config file stream on Pinot fs - '" + tableConfigURI + "'", e); - } - } else { - try { - tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException( - "Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e); - } - } - // Controller API returns a wrapper of table config. - JsonNode tableJsonNode; - try { - tableJsonNode = new ObjectMapper().readTree(tableConfigJson); - } catch (IOException e) { - throw new RuntimeException("Failed to decode table config into JSON from String - '" + tableConfigJson + "'", e); - } - if (tableJsonNode.has(OFFLINE)) { - tableJsonNode = tableJsonNode.get(OFFLINE); - } - try { - return TableConfig.fromJsonConfig(tableJsonNode); - } catch (IOException e) { - throw new RuntimeException("Failed to decode table config from JSON - '" + tableJsonNode + "'", e); - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
