This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch pinot-batch-ingestion-hadoop in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8cbf1d2298787305817c8ebd8776703db5d949b8 Author: Xiang Fu <[email protected]> AuthorDate: Wed Jan 15 04:29:23 2020 -0800 Adding pinot-batch-ingestion-hadoop module --- .../pinot-batch-ingestion-hadoop/pom.xml | 132 ++++++++++ .../hadoop/HadoopSegmentGenerationJobRunner.java | 285 +++++++++++++++++++++ .../hadoop/HadoopSegmentTarPushJobRunner.java | 107 ++++++++ .../hadoop/HadoopSegmentUriPushJobRunner.java | 107 ++++++++ .../batch/hadoop/SegmentCreationMapper.java | 205 +++++++++++++++ .../segmentCreationAndTarPushJobSpec.yaml | 45 ++++ .../segmentCreationAndUriPushJobSpec.yaml | 45 ++++ .../src/main/resources/segmentCreationJobSpec.yaml | 42 +++ .../src/main/resources/segmentTarPushJobSpec.yaml | 45 ++++ .../src/main/resources/segmentUriPushJobSpec.yaml | 45 ++++ pinot-plugins/pinot-batch-ingestion/pom.xml | 1 + 11 files changed, 1059 insertions(+) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pom.xml new file mode 100644 index 0000000..3f0f5cd --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pom.xml @@ -0,0 +1,132 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>pinot-batch-ingestion</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.3.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>pinot-batch-ingestion-hadoop</artifactId> + <name>Pinot Batch Ingestion for Hadoop</name> + <url>https://pinot.apache.org/</url> + <properties> + <pinot.root>${basedir}/../../..</pinot.root> + <hadoop.version>2.7.0</hadoop.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-batch-ingestion-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <classifier>hadoop2</classifier> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + <configuration> + <forceCreation>true</forceCreation> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java new file mode 100644 index 0000000..d773e2f --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.ingestion.batch.hadoop; + +import com.google.common.base.Preconditions; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + + +public class HadoopSegmentGenerationJobRunner extends Configured implements IngestionJobRunner, Serializable { + + public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec"; + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class); + private static final String DEPS_JAR_DIR = "dependencyJarDir"; + private static final String STAGING_DIR = "stagingDir"; + private SegmentGenerationJobSpec _spec; + + public HadoopSegmentGenerationJobRunner() { + } + + public HadoopSegmentGenerationJobRunner(SegmentGenerationJobSpec spec) { + init(spec); + } + + @Override + public void init(SegmentGenerationJobSpec spec) { + _spec = spec; + if (_spec.getInputDirURI() == null) { + throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file"); + } + if (_spec.getOutputDirURI() == null) { + throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' file"); + } + if (_spec.getRecordReaderSpec() == null) { + throw new RuntimeException("Missing property 'recordReaderSpec' in 'jobSpec' file"); + } + if (_spec.getTableSpec() == null) { + throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' file"); + } + if (_spec.getTableSpec().getTableName() == null) { + throw new RuntimeException("Missing property 'tableName' in 'tableSpec'"); + } + if (_spec.getTableSpec().getSchemaURI() == null) { + if (_spec.getPinotClusterSpecs() == null || _spec.getPinotClusterSpecs().length == 0) { + throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'"); + } + PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; + String schemaURI = SegmentGenerationUtils + .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); + _spec.getTableSpec().setSchemaURI(schemaURI); + } + if (_spec.getTableSpec().getTableConfigURI() == null) { + if (_spec.getPinotClusterSpecs() == null || _spec.getPinotClusterSpecs().length == 0) { + throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'"); + } + PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0]; + String tableConfigURI = SegmentGenerationUtils + .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName()); + _spec.getTableSpec().setTableConfigURI(tableConfigURI); + } + if (_spec.getExecutionFrameworkSpec().getExtraConfigs() == null) { + _spec.getExecutionFrameworkSpec().setExtraConfigs(new HashMap<>()); + } + } + + @Override + public void run() + throws Exception { + //init all file systems + List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); + for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { + Configuration config = new MapConfiguration(pinotFSSpec.getConfigs()); + PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), config); + } + + //Get pinotFS for input + URI inputDirURI = new URI(_spec.getInputDirURI()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(_spec.getInputDirURI()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + + //Get outputFS for writing output pinot segments + URI outputDirURI = new URI(_spec.getOutputDirURI()); + if (outputDirURI.getScheme() == null) { + outputDirURI = new File(_spec.getOutputDirURI()).toURI(); + } + PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); + outputDirFS.mkdir(outputDirURI); + + //Get staging directory for temporary output pinot segments + String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR); + Preconditions.checkNotNull(stagingDir, "Please set config: stagingDir under 'executionFrameworkSpec.extraConfigs'"); + URI stagingDirURI = URI.create(stagingDir); + if (stagingDirURI.getScheme() == null) { + stagingDirURI = new File(stagingDir).toURI(); + } + Path stagingInputDir = new Path(stagingDirURI.toString(), "input"); + if (!outputDirURI.getScheme().equals(stagingDirURI.getScheme())) { + throw new RuntimeException(String + .format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.", + stagingDirURI, outputDirURI)); + } + outputDirFS.mkdir(stagingDirURI); + + //Get list of files to process + String[] files = inputDirFS.listFiles(inputDirURI, true); + + //TODO: sort input files based on creation time + List<String> filteredFiles = new ArrayList<>(); + PathMatcher includeFilePathMatcher = null; + if (_spec.getIncludeFileNamePattern() != null) { + includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern()); + } + PathMatcher excludeFilePathMatcher = null; + if (_spec.getExcludeFileNamePattern() != null) { + excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern()); + } + + for (String file : files) { + if (includeFilePathMatcher != null) { + if (!includeFilePathMatcher.matches(Paths.get(file))) { + continue; + } + } + if (excludeFilePathMatcher != null) { + if (excludeFilePathMatcher.matches(Paths.get(file))) { + continue; + } + } + if (!inputDirFS.isDirectory(new URI(file))) { + filteredFiles.add(file); + } + } + + int numDataFiles = filteredFiles.size(); + if (numDataFiles == 0) { + String errorMessage = String + .format("No data file founded in [%s], with include file pattern: [%s] and exclude file pattern [%s]", + _spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern()); + LOGGER.error(errorMessage); + throw new RuntimeException(errorMessage); + } else { + LOGGER.info("Creating segments with data files: {}", filteredFiles); + for (int i = 0; i < numDataFiles; i++) { + String dataFilePath = filteredFiles.get(i); + + File localFile = new File("tmp"); + try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) { + dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath + " " + i)); + dataOutputStream.flush(); + outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(i)).toUri()); + } + } + } + + try { + // Set up the job + Job job = Job.getInstance(getConf()); + job.setJarByClass(getClass()); + job.setJobName(getClass().getName()); + + org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration(); + String hadoopTokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); + if (hadoopTokenFileLocation != null) { + jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation); + } + jobConf.setInt(JobContext.NUM_MAPS, numDataFiles); + + // Add dependency jars + if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) { + addDepsJarToDistributedCache(job, _spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR)); + } + + jobConf.set(SEGMENT_GENERATION_JOB_SPEC, new Yaml().dump(_spec)); + + job.setMapperClass(getMapperClass()); + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(Text.class); + + FileInputFormat.addInputPath(job, stagingInputDir); + FileOutputFormat.setOutputPath(job, new Path(stagingDir, "output")); + + // Submit the job + job.waitForCompletion(true); + if (!job.isSuccessful()) { + throw new RuntimeException("Job failed: " + job); + } + + if (stagingDirURI != null) { + LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, + outputDirURI); + outputDirFS.copy(stagingDirURI, outputDirURI); + } + } finally { + if (stagingDirURI != null) { + LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI); + outputDirFS.delete(stagingDirURI, true); + } + } + } + + /** + * Can be overridden to plug in custom mapper. + */ + protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() { + return SegmentCreationMapper.class; + } + + protected void addDepsJarToDistributedCache(Job job, String depsJarDir) + throws IOException { + if (depsJarDir != null) { + URI depsJarDirURI = URI.create(depsJarDir); + if (depsJarDirURI.getScheme() == null) { + depsJarDirURI = new File(depsJarDir).toURI(); + } + PinotFS pinotFS = PinotFSFactory.create(depsJarDirURI.getScheme()); + String[] files = pinotFS.listFiles(depsJarDirURI, true); + for (String file : files) { + URI fileURI = URI.create(file); + if (!pinotFS.isDirectory(fileURI)) { + if (file.endsWith(".jar")) { + LOGGER.info("Adding deps jar: {} to distributed cache", file); + job.addCacheArchive(fileURI); + } + } + } + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java new file mode 100644 index 0000000..f687b60 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.ingestion.batch.hadoop; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopSegmentTarPushJobRunner implements IngestionJobRunner, Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentTarPushJobRunner.class); + + private SegmentGenerationJobSpec _spec; + + public HadoopSegmentTarPushJobRunner() { + } + + public HadoopSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) { + init(spec); + } + + @Override + public void init(SegmentGenerationJobSpec spec) { + _spec = spec; + } + + @Override + public void run() { + //init all file systems + List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); + for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { + Configuration config = new MapConfiguration(pinotFSSpec.getConfigs()); + PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), config); + } + + //Get outputFS for writing output pinot segments + URI outputDirURI; + try { + outputDirURI = new URI(_spec.getOutputDirURI()); + if (outputDirURI.getScheme() == null) { + outputDirURI = new File(_spec.getOutputDirURI()).toURI(); + } + } catch (URISyntaxException e) { + throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'"); + } + PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); + //Get list of files to process + String[] files; + try { + files = outputDirFS.listFiles(outputDirURI, true); + } catch (IOException e) { + throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'"); + } + + List<String> segmentsToPush = new ArrayList<>(); + for (String file : files) { + if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) { + segmentsToPush.add(file); + } + } + + int pushParallelism = _spec.getPushJobSpec().getPushParallelism(); + if (pushParallelism < 1) { + pushParallelism = segmentsToPush.size(); + } + // Push from driver + try { + SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java new file mode 100644 index 0000000..8358a0a --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.ingestion.batch.hadoop; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopSegmentUriPushJobRunner implements IngestionJobRunner, Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentUriPushJobRunner.class); + + private SegmentGenerationJobSpec _spec; + + public HadoopSegmentUriPushJobRunner() { + } + + public HadoopSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) { + init(spec); + } + + @Override + public void init(SegmentGenerationJobSpec spec) { + _spec = spec; + if (_spec.getPushJobSpec() == null) { + throw new RuntimeException("Missing PushJobSpec"); + } + } + + @Override + public void run() { + //init all file systems + List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); + for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { + Configuration config = new MapConfiguration(pinotFSSpec.getConfigs()); + PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), config); + } + + //Get outputFS for writing output Pinot segments + URI outputDirURI; + try { + outputDirURI = new URI(_spec.getOutputDirURI()); + if (outputDirURI.getScheme() == null) { + outputDirURI = new File(_spec.getOutputDirURI()).toURI(); + } + } catch (URISyntaxException e) { + throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'"); + } + PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); + + //Get list of files to process + String[] files; + try { + files = outputDirFS.listFiles(outputDirURI, true); + } catch (IOException e) { + throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'"); + } + List<String> segmentUris = new ArrayList<>(); + for (String file : files) { + URI uri = URI.create(file); + if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { + segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() + .getSegmentUriSuffix()); + } + } + // Push from driver + try { + SegmentPushUtils.sendSegmentUris(_spec, segmentUris); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/SegmentCreationMapper.java new file mode 100644 index 0000000..de98972 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/SegmentCreationMapper.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.ingestion.batch.hadoop; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec; +import org.apache.pinot.spi.utils.DataSize; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import static org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner.SEGMENT_GENERATION_JOB_SPEC; + + +public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp"; + protected static final String PROGRESS_REPORTER_THREAD_NAME = "pinot-hadoop-progress-reporter"; + protected static final String SEGMENT_TAR_DIR = "segmentTar"; + protected static final long PROGRESS_REPORTER_JOIN_WAIT_TIME_MS = 5_000L; + + protected final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationMapper.class); + + protected Configuration _jobConf; + protected SegmentGenerationJobSpec _spec; + private File _localTempDir; + + @Override + public void setup(Context context) + throws IOException, InterruptedException { + _jobConf = context.getConfiguration(); + Yaml yaml = new Yaml(); + String segmentGenerationJobSpecStr = _jobConf.get(SEGMENT_GENERATION_JOB_SPEC); + _spec = yaml.loadAs(segmentGenerationJobSpecStr, SegmentGenerationJobSpec.class); + LOGGER.info("Segment generation job spec : {}", segmentGenerationJobSpecStr); + _localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + System.currentTimeMillis()); + } + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + try { + String[] splits = StringUtils.split(value.toString(), ' '); + Preconditions.checkState(splits.length == 2, "Illegal input value: {}", value); + + String path = splits[0]; + int idx = Integer.valueOf(splits[1]); + LOGGER.info("Generating segment with input file: {}, sequence id: {}", path, idx); + + URI inputDirURI = new URI(_spec.getInputDirURI()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(_spec.getInputDirURI()).toURI(); + } + + URI outputDirURI = new URI(_spec.getOutputDirURI()); + if (outputDirURI.getScheme() == null) { + outputDirURI = new File(_spec.getOutputDirURI()).toURI(); + } + PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); + + URI inputFileURI = URI.create(path); + if (inputFileURI.getScheme() == null) { + inputFileURI = + new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); + } + + //create localTempDir for input and output + File localInputTempDir = new File(_localTempDir, "input"); + FileUtils.forceMkdir(localInputTempDir); + File localOutputTempDir = new File(_localTempDir, "output"); + FileUtils.forceMkdir(localOutputTempDir); + + //copy input path to local + File localInputDataFile = new File(localInputTempDir, new File(inputFileURI).getName()); + PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile); + + //create task spec + SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); + taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); + taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); + taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); + taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); + taskSpec + .setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); + taskSpec.setSequenceId(idx); + taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + + // Start a thread that reports progress every minute during segment generation to prevent job getting killed + Thread progressReporterThread = new Thread(getProgressReporter(context)); + progressReporterThread.setName(PROGRESS_REPORTER_THREAD_NAME); + progressReporterThread.start(); + String segmentName; + try { + SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); + segmentName = taskRunner.run(); + } catch (Exception e) { + LOGGER.error("Caught exception while creating segment with input file: {}, sequence id: {}", path, idx, e); + throw new RuntimeException(e); + } finally { + progressReporterThread.interrupt(); + progressReporterThread.join(PROGRESS_REPORTER_JOIN_WAIT_TIME_MS); + if (progressReporterThread.isAlive()) { + LOGGER.error("Failed to interrupt progress reporter thread: {}", progressReporterThread); + } + } + + // Tar segment directory to compress file + File localSegmentDir = new File(localOutputTempDir, segmentName); + String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT; + File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); + LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath()); + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)); + //move segment to output PinotFS + URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) + .resolve(segmentTarFileName); + LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); + if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) { + LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI)); + } else { + outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + } + FileUtils.deleteQuietly(localSegmentDir); + FileUtils.deleteQuietly(localSegmentTarFile); + FileUtils.deleteQuietly(localInputDataFile); + + context.write(new LongWritable(idx), new Text(segmentTarFileName)); + LOGGER.info("Finish generating segment: {} with input file: {}, sequence id: {}", segmentName, inputFileURI, idx); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + FileUtils.deleteQuietly(_localTempDir); + } + } + + protected Runnable getProgressReporter(Context context) { + return new ProgressReporter(context); + } + + @Override + public void cleanup(Context context) { + LOGGER.info("Deleting local temporary directory: {}", _localTempDir); + FileUtils.deleteQuietly(_localTempDir); + } + + private static class ProgressReporter implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(ProgressReporter.class); + private static final long PROGRESS_REPORTER_INTERVAL_MS = 60_000L; + + private final Context _context; + + ProgressReporter(Context context) { + _context = context; + } + + @Override + public void run() { + LOGGER.info("Starting progress reporter thread: {}", Thread.currentThread()); + while (true) { + try { + Thread.sleep(PROGRESS_REPORTER_INTERVAL_MS); + LOGGER.info("============== Reporting progress =============="); + _context.progress(); + } catch (InterruptedException e) { + LOGGER.info("Progress reporter thread: {} interrupted", Thread.currentThread()); + return; + } + } + } + } +} \ No newline at end of file diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndTarPushJobSpec.yaml new file mode 100644 index 0000000..b8fbf1a --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndTarPushJobSpec.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'hadoop' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner' +jobType: SegmentCreationAndTarPush +inputDirURI: 'file:///path/to/input' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' +pushJobSpec: + pushAttempts: 2 + pushRetryIntervalMillis: 1000 \ No newline at end of file diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndUriPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndUriPushJobSpec.yaml new file mode 100644 index 0000000..f0e02e1 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndUriPushJobSpec.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'hadoop' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner' +jobType: SegmentCreationAndUriPush +inputDirURI: 'file:///path/to/input' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' +pushJobSpec: + segmentUriPrefix: 'file://' + segmentUriSuffix: '' \ No newline at end of file diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationJobSpec.yaml new file mode 100644 index 0000000..a8b667b --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationJobSpec.yaml @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'hadoop' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner' +jobType: SegmentCreation +inputDirURI: 'file:///path/to/input' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentTarPushJobSpec.yaml new file mode 100644 index 0000000..7ce85af --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentTarPushJobSpec.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'hadoop' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner' +jobType: SegmentTarPush +inputDirURI: 'file:///path/to/input' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' +pushJobSpec: + pushAttempts: 2 + pushRetryIntervalMillis: 1000 \ No newline at end of file diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentUriPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentUriPushJobSpec.yaml new file mode 100644 index 0000000..cf65805 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentUriPushJobSpec.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'hadoop' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner' +jobType: SegmentUriPush +inputDirURI: 'file:///path/to/input' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' +pushJobSpec: + segmentUriPrefix: 'file://' + segmentUriSuffix: '' \ No newline at end of file diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml b/pinot-plugins/pinot-batch-ingestion/pom.xml index 9f0c00b..92b2e93 100644 --- a/pinot-plugins/pinot-batch-ingestion/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/pom.xml @@ -40,6 +40,7 @@ <modules> <module>pinot-batch-ingestion-common</module> <module>pinot-batch-ingestion-spark</module> + <module>pinot-batch-ingestion-hadoop</module> <module>pinot-batch-ingestion-standalone</module> <module>v0_deprecated</module> </modules> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
