This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch pinot_spark_move_lambda_expression_to_inner_function in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8f63792287253dd6412e2caaf113806563ae1ef3 Author: Xiang Fu <[email protected]> AuthorDate: Wed Jul 22 21:06:25 2020 -0700 Move lambda expression to inner function in pinot-spark --- .../spark/SparkSegmentGenerationJobRunner.java | 164 +++++++++++---------- 1 file changed, 85 insertions(+), 79 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index b0aac11..dc28dfb 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -54,6 +54,7 @@ import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.VoidFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,93 +210,98 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null; final URI finalInputDirURI = inputDirURI; final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI; - pathRDD.foreach(pathAndIdx -> { - for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) { - PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); - } - PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme()); - String[] splits = pathAndIdx.split(" "); - String path = splits[0]; - int idx = Integer.valueOf(splits[1]); - // Load Pinot Plugins copied from Distributed cache. - File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ); - if (localPluginsTarFile.exists()) { - File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx); - try { - TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile); - } catch (Exception e) { - LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e); - throw new RuntimeException(e); + pathRDD.foreach(new VoidFunction<String>() { + @Override + public void call(String pathAndIdx) + throws Exception { + PluginManager.get().init(); + for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) { + PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } - LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME, - pluginsDirFile.getAbsolutePath()); - System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath()); - if (pluginsInclude != null) { - LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); - System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); + PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme()); + String[] splits = pathAndIdx.split(" "); + String path = splits[0]; + int idx = Integer.valueOf(splits[1]); + // Load Pinot Plugins copied from Distributed cache. + File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ); + if (localPluginsTarFile.exists()) { + File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx); + try { + TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile); + } catch (Exception e) { + LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e); + throw new RuntimeException(e); + } + LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME, + pluginsDirFile.getAbsolutePath()); + System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath()); + if (pluginsInclude != null) { + LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); + System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude); + } + LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", + System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME)); + } else { + LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath()); + } + URI inputFileURI = URI.create(path); + if (inputFileURI.getScheme() == null) { + inputFileURI = + new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); } - LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", - System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME)); - } else { - LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath()); - } - URI inputFileURI = URI.create(path); - if (inputFileURI.getScheme() == null) { - inputFileURI = - new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); - } - //create localTempDir for input and output - File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID()); - File localInputTempDir = new File(localTempDir, "input"); - FileUtils.forceMkdir(localInputTempDir); - File localOutputTempDir = new File(localTempDir, "output"); - FileUtils.forceMkdir(localOutputTempDir); + //create localTempDir for input and output + File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID()); + File localInputTempDir = new File(localTempDir, "input"); + FileUtils.forceMkdir(localInputTempDir); + File localOutputTempDir = new File(localTempDir, "output"); + FileUtils.forceMkdir(localOutputTempDir); - //copy input path to local - File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI)); - LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile); - PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile); + //copy input path to local + File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI)); + LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile); + PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile); - //create task spec - SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); - taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); - taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); - taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); - taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); - taskSpec.setTableConfig( - SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); - taskSpec.setSequenceId(idx); - taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + //create task spec + SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); + taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); + taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); + taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); + taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); + taskSpec.setTableConfig( + SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); + taskSpec.setSequenceId(idx); + taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); - SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); - String segmentName = taskRunner.run(); + SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); + String segmentName = taskRunner.run(); - // Tar segment directory to compress file - File localSegmentDir = new File(localOutputTempDir, segmentName); - String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT; - File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); - LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); - TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); - long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); - long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); - LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, - DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); - //move segment to output PinotFS - URI outputSegmentTarURI = - SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI) - .resolve(segmentTarFileName); - LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); - if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme()) - .exists(outputSegmentTarURI)) { - LOGGER - .warn("Not overwrite existing output segment tar file: {}", finalOutputDirFS.exists(outputSegmentTarURI)); - } else { - finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + // Tar segment directory to compress file + File localSegmentDir = new File(localOutputTempDir, segmentName); + String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT; + File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); + LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); + //move segment to output PinotFS + URI outputSegmentTarURI = + SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI) + .resolve(segmentTarFileName); + LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); + if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme()) + .exists(outputSegmentTarURI)) { + LOGGER + .warn("Not overwrite existing output segment tar file: {}", finalOutputDirFS.exists(outputSegmentTarURI)); + } else { + finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + } + FileUtils.deleteQuietly(localSegmentDir); + FileUtils.deleteQuietly(localSegmentTarFile); + FileUtils.deleteQuietly(localInputDataFile); } - FileUtils.deleteQuietly(localSegmentDir); - FileUtils.deleteQuietly(localSegmentTarFile); - FileUtils.deleteQuietly(localInputDataFile); }); if (stagingDirURI != null) { LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
