This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch 
pinot_spark_move_lambda_expression_to_inner_function
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8f63792287253dd6412e2caaf113806563ae1ef3
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jul 22 21:06:25 2020 -0700

    Move lambda expression to inner function in pinot-spark
---
 .../spark/SparkSegmentGenerationJobRunner.java     | 164 +++++++++++----------
 1 file changed, 85 insertions(+), 79 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index b0aac11..dc28dfb 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -54,6 +54,7 @@ import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,93 +210,98 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
               .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
       final URI finalInputDirURI = inputDirURI;
       final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : 
stagingDirURI;
-      pathRDD.foreach(pathAndIdx -> {
-        for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
-          PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-        }
-        PinotFS finalOutputDirFS = 
PinotFSFactory.create(finalOutputDirURI.getScheme());
-        String[] splits = pathAndIdx.split(" ");
-        String path = splits[0];
-        int idx = Integer.valueOf(splits[1]);
-        // Load Pinot Plugins copied from Distributed cache.
-        File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
-        if (localPluginsTarFile.exists()) {
-          File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
-          try {
-            TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
-          } catch (Exception e) {
-            LOGGER.error("Failed to untar local Pinot plugins tarball file 
[{}]", localPluginsTarFile, e);
-            throw new RuntimeException(e);
+      pathRDD.foreach(new VoidFunction<String>() {
+        @Override
+        public void call(String pathAndIdx)
+            throws Exception {
+          PluginManager.get().init();
+          for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
+            PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
           }
-          LOGGER.info("Trying to set System Property: [{}={}]", 
PLUGINS_DIR_PROPERTY_NAME,
-              pluginsDirFile.getAbsolutePath());
-          System.setProperty(PLUGINS_DIR_PROPERTY_NAME, 
pluginsDirFile.getAbsolutePath());
-          if (pluginsInclude != null) {
-            LOGGER.info("Trying to set System Property: [{}={}]", 
PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
-            System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+          PinotFS finalOutputDirFS = 
PinotFSFactory.create(finalOutputDirURI.getScheme());
+          String[] splits = pathAndIdx.split(" ");
+          String path = splits[0];
+          int idx = Integer.valueOf(splits[1]);
+          // Load Pinot Plugins copied from Distributed cache.
+          File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
+          if (localPluginsTarFile.exists()) {
+            File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
+            try {
+              TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+            } catch (Exception e) {
+              LOGGER.error("Failed to untar local Pinot plugins tarball file 
[{}]", localPluginsTarFile, e);
+              throw new RuntimeException(e);
+            }
+            LOGGER.info("Trying to set System Property: [{}={}]", 
PLUGINS_DIR_PROPERTY_NAME,
+                pluginsDirFile.getAbsolutePath());
+            System.setProperty(PLUGINS_DIR_PROPERTY_NAME, 
pluginsDirFile.getAbsolutePath());
+            if (pluginsInclude != null) {
+              LOGGER.info("Trying to set System Property: [{}={}]", 
PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+              System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, 
pluginsInclude);
+            }
+            LOGGER.info("Pinot plugins System Properties are set at [{}], 
plugins includes [{}]",
+                System.getProperty(PLUGINS_DIR_PROPERTY_NAME), 
System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
+          } else {
+            LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", 
localPluginsTarFile.getAbsolutePath());
+          }
+          URI inputFileURI = URI.create(path);
+          if (inputFileURI.getScheme() == null) {
+            inputFileURI =
+                new URI(finalInputDirURI.getScheme(), 
inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
           }
-          LOGGER.info("Pinot plugins System Properties are set at [{}], 
plugins includes [{}]",
-              System.getProperty(PLUGINS_DIR_PROPERTY_NAME), 
System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
-        } else {
-          LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", 
localPluginsTarFile.getAbsolutePath());
-        }
-        URI inputFileURI = URI.create(path);
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(finalInputDirURI.getScheme(), 
inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
 
-        //create localTempDir for input and output
-        File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + 
UUID.randomUUID());
-        File localInputTempDir = new File(localTempDir, "input");
-        FileUtils.forceMkdir(localInputTempDir);
-        File localOutputTempDir = new File(localTempDir, "output");
-        FileUtils.forceMkdir(localOutputTempDir);
+          //create localTempDir for input and output
+          File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" 
+ UUID.randomUUID());
+          File localInputTempDir = new File(localTempDir, "input");
+          FileUtils.forceMkdir(localInputTempDir);
+          File localOutputTempDir = new File(localTempDir, "output");
+          FileUtils.forceMkdir(localOutputTempDir);
 
-        //copy input path to local
-        File localInputDataFile = new File(localInputTempDir, 
getFileName(inputFileURI));
-        LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, 
localInputDataFile);
-        
PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, 
localInputDataFile);
+          //copy input path to local
+          File localInputDataFile = new File(localInputTempDir, 
getFileName(inputFileURI));
+          LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, 
localInputDataFile);
+          
PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, 
localInputDataFile);
 
-        //create task spec
-        SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
-        taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
-        taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
-        taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-        
taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-        taskSpec.setTableConfig(
-            
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
-        taskSpec.setSequenceId(idx);
-        
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+          //create task spec
+          SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+          taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+          
taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+          taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+          
taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
+          taskSpec.setTableConfig(
+              
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
+          taskSpec.setSequenceId(idx);
+          
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
-        SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
-        String segmentName = taskRunner.run();
+          SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
+          String segmentName = taskRunner.run();
 
-        // Tar segment directory to compress file
-        File localSegmentDir = new File(localOutputTempDir, segmentName);
-        String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
-        File localSegmentTarFile = new File(localOutputTempDir, 
segmentTarFileName);
-        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, 
localSegmentTarFile);
-        TarGzCompressionUtils.createTarGzFile(localSegmentDir, 
localSegmentTarFile);
-        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", 
segmentName,
-            DataSizeUtils.fromBytes(uncompressedSegmentSize), 
DataSizeUtils.fromBytes(compressedSegmentSize));
-        //move segment to output PinotFS
-        URI outputSegmentTarURI =
-            SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI)
-                .resolve(segmentTarFileName);
-        LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
-        if (!_spec.isOverwriteOutput() && 
PinotFSFactory.create(outputSegmentTarURI.getScheme())
-            .exists(outputSegmentTarURI)) {
-          LOGGER
-              .warn("Not overwrite existing output segment tar file: {}", 
finalOutputDirFS.exists(outputSegmentTarURI));
-        } else {
-          finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
+          // Tar segment directory to compress file
+          File localSegmentDir = new File(localOutputTempDir, segmentName);
+          String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+          File localSegmentTarFile = new File(localOutputTempDir, 
segmentTarFileName);
+          LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, 
localSegmentTarFile);
+          TarGzCompressionUtils.createTarGzFile(localSegmentDir, 
localSegmentTarFile);
+          long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+          long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+          LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: 
{}", segmentName,
+              DataSizeUtils.fromBytes(uncompressedSegmentSize), 
DataSizeUtils.fromBytes(compressedSegmentSize));
+          //move segment to output PinotFS
+          URI outputSegmentTarURI =
+              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI)
+                  .resolve(segmentTarFileName);
+          LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
+          if (!_spec.isOverwriteOutput() && 
PinotFSFactory.create(outputSegmentTarURI.getScheme())
+              .exists(outputSegmentTarURI)) {
+            LOGGER
+                .warn("Not overwrite existing output segment tar file: {}", 
finalOutputDirFS.exists(outputSegmentTarURI));
+          } else {
+            finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
+          }
+          FileUtils.deleteQuietly(localSegmentDir);
+          FileUtils.deleteQuietly(localSegmentTarFile);
+          FileUtils.deleteQuietly(localInputDataFile);
         }
-        FileUtils.deleteQuietly(localSegmentDir);
-        FileUtils.deleteQuietly(localSegmentTarFile);
-        FileUtils.deleteQuietly(localInputDataFile);
       });
       if (stagingDirURI != null) {
         LOGGER.info("Trying to copy segment tars from staging directory: [{}] 
to output directory [{}]", stagingDirURI,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to