This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch parallelize_batch_ingestion_standalone_job in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 3ea8a59a8efc90087421a2acd34c434eacf0dee6 Author: Xiang Fu <[email protected]> AuthorDate: Thu Oct 29 17:52:29 2020 -0700 Support running pinot batch ingestion standalone job in a parallel mode --- .../standalone/SegmentGenerationJobRunner.java | 118 +++++++++++++-------- 1 file changed, 75 insertions(+), 43 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index 661134f..518e0d1 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.ingestion.batch.standalone; import java.io.File; import java.net.URI; +import java.net.URISyntaxException; import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; @@ -27,6 +28,9 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; @@ -52,6 +56,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class); private SegmentGenerationJobSpec _spec; + private ExecutorService _executorService; public SegmentGenerationJobRunner() { } @@ -83,8 +88,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'"); } PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; - String schemaURI = SegmentGenerationUtils.generateSchemaURI(pinotClusterSpec.getControllerURI(), - _spec.getTableSpec().getTableName()); + String schemaURI = SegmentGenerationUtils + .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); _spec.getTableSpec().setSchemaURI(schemaURI); } if (_spec.getTableSpec().getTableConfigURI() == null) { @@ -96,6 +101,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); _spec.getTableSpec().setTableConfigURI(tableConfigURI); } + int numThreads = (_spec.getSegmentCreationJobParallelism() > 0) ? _spec.getSegmentCreationJobParallelism() : 1; + LOGGER.info("Creating an executor service with {} threads.", numThreads); + _executorService = Executors.newFixedThreadPool(numThreads); } @Override @@ -108,17 +116,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { } //Get pinotFS for input - URI inputDirURI = new URI(_spec.getInputDirURI()); - if (inputDirURI.getScheme() == null) { - inputDirURI = new File(_spec.getInputDirURI()).toURI(); - } + final URI inputDirURI = getDirectoryUri(_spec.getInputDirURI()); PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); //Get outputFS for writing output pinot segments - URI outputDirURI = new URI(_spec.getOutputDirURI()); - if (outputDirURI.getScheme() == null) { - outputDirURI = new File(_spec.getOutputDirURI()).toURI(); - } + final URI outputDirURI = getDirectoryUri(_spec.getOutputDirURI()); PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); outputDirFS.mkdir(outputDirURI); @@ -151,7 +153,6 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { filteredFiles.add(file); } } - File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID()); try { //create localTempDir for input and output @@ -164,13 +165,12 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()); TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()); + int numInputFiles = filteredFiles.size(); + CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles); //iterate on the file list, for each - for (int i = 0; i < filteredFiles.size(); i++) { - URI inputFileURI = URI.create(filteredFiles.get(i)); - if (inputFileURI.getScheme() == null) { - inputFileURI = - new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); - } + for (int i = 0; i < numInputFiles; i++) { + final URI inputFileURI = getFileURI(filteredFiles.get(i), inputDirURI.getScheme()); + //copy input path to local File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName()); inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile); @@ -185,35 +185,67 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { taskSpec.setSequenceId(i); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); - //invoke segmentGenerationTask - SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); - String segmentName = taskRunner.run(); - - // Tar segment directory to compress file - File localSegmentDir = new File(localOutputTempDir, segmentName); - String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT; - File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); - LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); - TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); - long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); - long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); - LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, - DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); - //move segment to output PinotFS - URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) - .resolve(segmentTarFileName); - if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) { - LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI)); - } else { - outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); - } - FileUtils.deleteQuietly(localSegmentDir); - FileUtils.deleteQuietly(localSegmentTarFile); - FileUtils.deleteQuietly(localInputDataFile); + LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI); + _executorService.submit(() -> { + File localSegmentDir = null; + File localSegmentTarFile = null; + try { + //invoke segmentGenerationTask + SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); + String segmentName = taskRunner.run(); + // Tar segment directory to compress file + localSegmentDir = new File(localOutputTempDir, segmentName); + String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT; + localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); + LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); + //move segment to output PinotFS + URI outputSegmentTarURI = + SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) + .resolve(segmentTarFileName); + if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) { + LOGGER + .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI)); + } else { + outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + } + } catch (Exception e) { + LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e); + } finally { + segmentCreationTaskCountDownLatch.countDown(); + FileUtils.deleteQuietly(localSegmentDir); + FileUtils.deleteQuietly(localSegmentTarFile); + FileUtils.deleteQuietly(localInputDataFile); + } + }); } + segmentCreationTaskCountDownLatch.await(); } finally { //clean up - FileUtils.deleteDirectory(localTempDir); + FileUtils.deleteQuietly(localTempDir); + _executorService.shutdown(); + } + } + + private URI getDirectoryUri(String uriStr) + throws URISyntaxException { + URI uri = new URI(uriStr); + if (uri.getScheme() == null) { + uri = new File(uriStr).toURI(); + } + return uri; + } + + private URI getFileURI(String uriStr, String fallbackScheme) + throws URISyntaxException { + URI fileURI = URI.create(uriStr); + if (fileURI.getScheme() == null) { + return new URI(fallbackScheme, fileURI.getSchemeSpecificPart(), fileURI.getFragment()); } + return fileURI; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
