This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch default_local_seq_id in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 27426f5d1209f713e5d215ec08483cb3f17aabba Author: Xiang Fu <[email protected]> AuthorDate: Sat Feb 20 03:15:24 2021 -0800 Default to use local directory sequence id for segment name generation --- .../batch/common/SegmentGenerationJobUtils.java | 37 ++++++ .../hadoop/HadoopSegmentGenerationJobRunner.java | 45 +++++-- .../spark/SparkSegmentGenerationJobRunner.java | 10 +- .../standalone/SegmentGenerationJobRunner.java | 144 +++++++++++++-------- 4 files changed, 162 insertions(+), 74 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java new file mode 100644 index 0000000..c89d0bc --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.ingestion.batch.common; + +import java.io.Serializable; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec; + + +public class SegmentGenerationJobUtils implements Serializable { + + /** + * Always use local directory sequence id unless explicitly configured. + * + */ + public static boolean useLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) { + if (spec == null || spec.getConfigs() == null) { + return true; + } + return Boolean.parseBoolean(spec.getConfigs().get(SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID)); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 513ac24..ee866da 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -31,9 +31,11 @@ import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -50,6 +52,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -218,15 +221,27 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge throw new RuntimeException(errorMessage); } else { LOGGER.info("Creating segments with data files: {}", filteredFiles); - for (int i = 0; i < numDataFiles; i++) { - // Typically PinotFS implementations list files without a protocol, so we lose (for example) the - // hdfs:// portion of the path. Call getFileURI() to fix this up. - URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI); - File localFile = File.createTempFile("pinot-filepath-", ".txt"); - try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) { - dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + i)); - dataOutputStream.flush(); - outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(i)).toUri()); + if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) { + Map<String, List<String>> localDirIndex = new HashMap<>(); + for (String filteredFile : filteredFiles) { + java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent(); + if (!localDirIndex.containsKey(filteredParentPath.toString())) { + localDirIndex.put(filteredParentPath.toString(), new ArrayList<>()); + } + localDirIndex.get(filteredParentPath.toString()).add(filteredFile); + } + for (String parentPath : localDirIndex.keySet()) { + List<String> siblingFiles = localDirIndex.get(parentPath); + Collections.sort(siblingFiles); + for (int i = 0; i < siblingFiles.size(); i++) { + URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath)); + createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i); + } + } + } else { + for (int i = 0; i < numDataFiles; i++) { + URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI); + createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i); } } } @@ -308,6 +323,18 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge } } + private void createInputFileUriAndSeqIdFile(URI inputFileURI, PinotFS outputDirFS, Path stagingInputDir, int seqId) + throws Exception { + // Typically PinotFS implementations list files without a protocol, so we lose (for example) the + // hdfs:// portion of the path. Call getFileURI() to fix this up. + File localFile = File.createTempFile("pinot-filepath-", ".txt"); + try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) { + dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + seqId)); + dataOutputStream.flush(); + outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(seqId)).toUri()); + } + } + /** * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir. * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index a315d15..4bb97a1 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -43,6 +43,7 @@ import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.spi.env.PinotConfiguration; @@ -210,7 +211,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } List<String> pathAndIdxList = new ArrayList<>(); - if (getLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) { + if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) { Map<String, List<String>> localDirIndex = new HashMap<>(); for (String filteredFile : filteredFiles) { Path filteredParentPath = Paths.get(filteredFile).getParent(); @@ -351,13 +352,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } } - private static boolean getLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) { - if (spec == null || spec.getConfigs() == null) { - return false; - } - return Boolean.parseBoolean(spec.getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID)); - } - protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir) throws IOException { if (depsJarDir != null) { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index efe1679..7f15939 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -25,16 +25,19 @@ import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; -import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -170,62 +173,39 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { int numInputFiles = filteredFiles.size(); CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles); - //iterate on the file list, for each - for (int i = 0; i < numInputFiles; i++) { - final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI); - - //copy input path to local - File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName()); - inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile); - - //create task spec - SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); - taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); - taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); - taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); - taskSpec.setSchema(schema); - taskSpec.setTableConfig(tableConfig); - taskSpec.setSequenceId(i); - taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); - taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); - - LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI); - _executorService.submit(() -> { - File localSegmentDir = null; - File localSegmentTarFile = null; - try { - //invoke segmentGenerationTask - SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); - String segmentName = taskRunner.run(); - // Tar segment directory to compress file - localSegmentDir = new File(localOutputTempDir, segmentName); - String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8"); - localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); - LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); - TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); - long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); - long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); - LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, - DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); - //move segment to output PinotFS - URI outputSegmentTarURI = - SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) - .resolve(segmentTarFileName); - if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) { - LOGGER - .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI)); - } else { - outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); - } - } catch (Exception e) { - LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e); - } finally { - segmentCreationTaskCountDownLatch.countDown(); - FileUtils.deleteQuietly(localSegmentDir); - FileUtils.deleteQuietly(localSegmentTarFile); - FileUtils.deleteQuietly(localInputDataFile); + //create task spec + SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); + taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); + taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); + taskSpec.setSchema(schema); + taskSpec.setTableConfig(tableConfig); + taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + + if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) { + Map<String, List<String>> localDirIndex = new HashMap<>(); + for (String filteredFile : filteredFiles) { + java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent(); + if (!localDirIndex.containsKey(filteredParentPath.toString())) { + localDirIndex.put(filteredParentPath.toString(), new ArrayList<>()); } - }); + localDirIndex.get(filteredParentPath.toString()).add(filteredFile); + } + for (String parentPath : localDirIndex.keySet()) { + List<String> siblingFiles = localDirIndex.get(parentPath); + Collections.sort(siblingFiles); + for (int i = 0; i < siblingFiles.size(); i++) { + URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath)); + submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir, + localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch); + } + } + } else { + //iterate on the file list, for each + for (int i = 0; i < numInputFiles; i++) { + final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI); + submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir, + localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch); + } } segmentCreationTaskCountDownLatch.await(); } finally { @@ -234,4 +214,54 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { _executorService.shutdown(); } } + + private void submitSegmentGenTask(PinotFS inputDirFS, PinotFS outputDirFS, URI inputDirURI, URI outputDirURI, + File localInputTempDir, File localOutputTempDir, SegmentGenerationTaskSpec taskSpec, URI inputFileURI, int seqId, + CountDownLatch segmentCreationTaskCountDownLatch) + throws Exception { + //copy input path to local + File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName()); + inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile); + + //Update task spec + taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); + taskSpec.setSequenceId(seqId); + taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); + + LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI); + _executorService.submit(() -> { + File localSegmentDir = null; + File localSegmentTarFile = null; + try { + //invoke segmentGenerationTask + SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); + String segmentName = taskRunner.run(); + // Tar segment directory to compress file + localSegmentDir = new File(localOutputTempDir, segmentName); + String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8"); + localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); + LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); + //move segment to output PinotFS + URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) + .resolve(segmentTarFileName); + if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) { + LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI)); + } else { + outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + } + } catch (Exception e) { + LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e); + } finally { + segmentCreationTaskCountDownLatch.countDown(); + FileUtils.deleteQuietly(localSegmentDir); + FileUtils.deleteQuietly(localSegmentTarFile); + FileUtils.deleteQuietly(localInputDataFile); + } + }); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
