This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch standalone-pbnj in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f43624559a06fbb76b3f4bac1cdcec44712a8099 Author: kishoreg <[email protected]> AuthorDate: Sun Dec 1 18:55:25 2019 -0800 Adding pinot-standalone module to generate segments --- .../apache/pinot/filesystem/PinotFSFactory.java | 24 +-- .../{ => pinot-standalone}/pom.xml | 77 +++++++--- .../pinot/ingestion/common/PinotClusterSpec.java | 32 ++++ .../apache/pinot/ingestion/common/PinotFSSpec.java | 55 +++++++ .../pinot/ingestion/common/RecordReaderSpec.java | 65 +++++++++ .../ingestion/common/SegmentGenerationJobSpec.java | 162 +++++++++++++++++++++ .../common/SegmentGenerationTaskRunner.java | 132 +++++++++++++++++ .../common/SegmentGenerationTaskSpec.java | 99 +++++++++++++ .../ingestion/common/SegmentNameGeneratorSpec.java | 45 ++++++ .../apache/pinot/ingestion/common/TableSpec.java | 52 +++++++ .../standalone/SegmentGenerationJobRunner.java | 114 +++++++++++++++ .../standalone/StandaloneIngestionJobLauncher.java | 45 ++++++ pinot-ingestion-jobs/pom.xml | 1 + 13 files changed, 868 insertions(+), 35 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java index 9f0c98c..4e41dab 100644 --- a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java +++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java @@ -43,6 +43,18 @@ public class PinotFSFactory { } + public static void register(String scheme, String fsClassName, Configuration configuration) { + try { + LOGGER.info("Initializing PinotFS for scheme {}, classname {}", scheme, fsClassName); + PinotFS pinotFS = (PinotFS) Class.forName(fsClassName).newInstance(); + pinotFS.init(configuration); + _fileSystemMap.put(scheme, pinotFS); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + LOGGER.error("Could not instantiate file system for class {}", fsClassName, e); + throw new RuntimeException(e); + } + } + public static void init(Configuration fsConfig) { // Get schemes and their respective classes Iterator<String> keys = fsConfig.subset(CLASS).getKeys(); @@ -53,17 +65,7 @@ public class PinotFSFactory { String key = keys.next(); String fsClassName = (String) fsConfig.getProperty(CLASS + "." + key); LOGGER.info("Got scheme {}, classname {}, starting to initialize", key, fsClassName); - - try { - PinotFS pinotFS = (PinotFS) Class.forName(fsClassName).newInstance(); - pinotFS.init(fsConfig.subset(key)); - - LOGGER.info("Initializing PinotFS for scheme {}, classname {}", key, fsClassName); - _fileSystemMap.put(key, pinotFS); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - LOGGER.error("Could not instantiate file system for class {}", fsClassName, e); - throw new RuntimeException(e); - } + register(key, fsClassName, fsConfig.subset(key)); } if (!_fileSystemMap.containsKey(DEFAULT_FS_SCHEME)) { diff --git a/pinot-ingestion-jobs/pom.xml b/pinot-ingestion-jobs/pinot-standalone/pom.xml similarity index 51% copy from pinot-ingestion-jobs/pom.xml copy to pinot-ingestion-jobs/pinot-standalone/pom.xml index b89a8f6..cd32f53 100644 --- a/pinot-ingestion-jobs/pom.xml +++ b/pinot-ingestion-jobs/pinot-standalone/pom.xml @@ -1,4 +1,4 @@ -<?xml version="1.0"?> +<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -22,45 +22,57 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>pinot</artifactId> + <artifactId>pinot-ingestion-jobs</artifactId> <groupId>org.apache.pinot</groupId> <version>0.3.0-SNAPSHOT</version> + <relativePath>..</relativePath> </parent> - - <artifactId>pinot-ingestion-jobs</artifactId> - <packaging>pom</packaging> - <name>Pinot Ingestion Jobs</name> + <modelVersion>4.0.0</modelVersion> + <artifactId>pinot-standalone</artifactId> + <name>Pinot Ingestion Standalone</name> <url>https://pinot.apache.org/</url> <properties> - <pinot.root>${basedir}/..</pinot.root> + <pinot.root>${basedir}/../..</pinot.root> </properties> - <modules> - <module>pinot-ingestion-common</module> - <module>pinot-hadoop</module> - <module>pinot-spark</module> - </modules> - <dependencies> <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-core</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> <scope>provided</scope> <exclusions> <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> </exclusion> </exclusions> </dependency> - + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + <scope>provided</scope> + </dependency> <!-- test --> <dependency> <groupId>org.testng</groupId> @@ -69,4 +81,21 @@ </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + <configuration> + <forceCreation>true</forceCreation> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + </plugins> + </build> </project> + diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java new file mode 100644 index 0000000..2c8195d --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +public class PinotClusterSpec { + + String _controllerURI; + + public String getControllerURI() { + return _controllerURI; + } + + public void setControllerURI(String controllerURI) { + _controllerURI = controllerURI; + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java new file mode 100644 index 0000000..078c8e5 --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +import java.util.Map; + + +public class PinotFSSpec { + + String _scheme; + + String _className; + + Map<String, String> _configs; + + public String getScheme() { + return _scheme; + } + + public void setScheme(String scheme) { + _scheme = scheme; + } + + public String getClassName() { + return _className; + } + + public void setClassName(String className) { + _className = className; + } + + public Map<String, String> getConfigs() { + return _configs; + } + + public void setConfigs(Map<String, String> configs) { + _configs = configs; + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java new file mode 100644 index 0000000..f374a66 --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +import java.util.Map; + + +public class RecordReaderSpec { + + String _dataFormat; + + String _className; + + String _configClassName; + + Map<String, String> _configs; + + public String getDataFormat() { + return _dataFormat; + } + + public void setDataFormat(String dataFormat) { + _dataFormat = dataFormat; + } + + public String getClassName() { + return _className; + } + + public void setClassName(String className) { + _className = className; + } + + public Map<String, String> getConfigs() { + return _configs; + } + + public void setConfigs(Map<String, String> configs) { + _configs = configs; + } + + public String getConfigClassName() { + return _configClassName; + } + + public void setConfigClassName(String configClassName) { + _configClassName = configClassName; + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java new file mode 100644 index 0000000..1b7b7c9 --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.Reader; +import java.io.StringWriter; +import java.util.List; +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.Yaml; + + +public class SegmentGenerationJobSpec { + + String jobType; + + String _inputDirURI; + + String _includeFileNamePattern; + + String _excludeFileNamePattern; + + String _outputDirURI; + + boolean _overwriteOutput; + + List<PinotFSSpec> _pinotFSSpecs; + + TableSpec _tableSpec; + + RecordReaderSpec _recordReaderSpec; + + SegmentNameGeneratorSpec _segmentNameGeneratorSpec; + + PinotClusterSpec[] _pinotClusterSpecs; + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getInputDirURI() { + return _inputDirURI; + } + + public void setInputDirURI(String inputDirURI) { + _inputDirURI = inputDirURI; + } + + public String getIncludeFileNamePattern() { + return _includeFileNamePattern; + } + + public void setIncludeFileNamePattern(String includeFileNamePattern) { + _includeFileNamePattern = includeFileNamePattern; + } + + public String getExcludeFileNamePattern() { + return _excludeFileNamePattern; + } + + public void setExcludeFileNamePattern(String excludeFileNamePattern) { + _excludeFileNamePattern = excludeFileNamePattern; + } + + public String getOutputDirURI() { + return _outputDirURI; + } + + public void setOutputDirURI(String outputDirURI) { + _outputDirURI = outputDirURI; + } + + public boolean isOverwriteOutput() { + return _overwriteOutput; + } + + public void setOverwriteOutput(boolean overwriteOutput) { + _overwriteOutput = overwriteOutput; + } + + public List<PinotFSSpec> getPinotFSSpecs() { + return _pinotFSSpecs; + } + + public void setPinotFSSpecs(List<PinotFSSpec> pinotFSSpecs) { + _pinotFSSpecs = pinotFSSpecs; + } + + public TableSpec getTableSpec() { + return _tableSpec; + } + + public void setTableSpec(TableSpec tableSpec) { + _tableSpec = tableSpec; + } + + public RecordReaderSpec getRecordReaderSpec() { + return _recordReaderSpec; + } + + public void setRecordReaderSpec(RecordReaderSpec recordReaderSpec) { + _recordReaderSpec = recordReaderSpec; + } + + public PinotClusterSpec[] getPinotClusterSpecs() { + return _pinotClusterSpecs; + } + + public void setPinotClusterSpecs(PinotClusterSpec[] pinotClusterSpecs) { + _pinotClusterSpecs = pinotClusterSpecs; + } + + public SegmentNameGeneratorSpec getSegmentNameGeneratorSpec() { + return _segmentNameGeneratorSpec; + } + + public void setSegmentNameGeneratorSpec(SegmentNameGeneratorSpec segmentNameGeneratorSpec) { + _segmentNameGeneratorSpec = segmentNameGeneratorSpec; + } + + public static void main(String[] args) + throws Exception { + DumperOptions options = new DumperOptions(); + options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK); + options.setPrettyFlow(true); + Yaml yaml = new Yaml(options); + + Reader reader; + reader = new BufferedReader(new FileReader("/Users/kishoreg/Documents/testJobSpec.yaml")); + SegmentGenerationJobSpec loadedSpec = yaml.loadAs(reader, SegmentGenerationJobSpec.class); + System.out.println("loadedSpec = " + loadedSpec); + + SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec(); + + StringWriter sw = new StringWriter(); + yaml.dump(loadedSpec, sw); + System.out.println("dump = " + sw.toString()); + } +} + + diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java new file mode 100644 index 0000000..d0a7bcc --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.io.File; +import java.net.URLClassLoader; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeFieldSpec; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class SegmentGenerationTaskRunner { + + public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type"; + public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple"; + public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate"; + public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR; + + // For SimpleSegmentNameGenerator + public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix"; + + // For NormalizedDateSegmentNameGenerator + public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix"; + public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id"; + + private SegmentGenerationTaskSpec _taskSpec; + + public SegmentGenerationTaskRunner(SegmentGenerationTaskSpec taskSpec) { + _taskSpec = taskSpec; + } + + public String run() + throws Exception { + + String tableName = _taskSpec.getTableConfig().getTableName(); + TableConfig tableConfig = _taskSpec._tableConfig; + Schema schema = _taskSpec.getSchema(); + + //init record reader config + String readerConfigClassName = _taskSpec.getRecordReaderSpec().getConfigClassName(); + RecordReaderConfig recordReaderConfig = null; + + if (readerConfigClassName != null) { + Map<String, String> configs = _taskSpec.getRecordReaderSpec().getConfigs(); + if(configs == null) { + configs = new HashMap<>(); + } + JsonNode jsonNode = new ObjectMapper().valueToTree(configs); + JsonUtils.jsonNodeToObject(jsonNode, Class.forName(readerConfigClassName)); + recordReaderConfig = (RecordReaderConfig) Class.forName(readerConfigClassName).newInstance(); + } + + //init record reader + String readerClassName = _taskSpec.getRecordReaderSpec().getClassName(); + RecordReader recordReader = (RecordReader) Class.forName(readerClassName).newInstance(); + recordReader.init(new File(_taskSpec.getInputFilePath()), schema, recordReaderConfig); + + //init segmentName Generator + String segmentNameGeneratorType = _taskSpec.getSegmentNameGeneratorSpec().getType(); + if (segmentNameGeneratorType == null) { + segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR; + } + Map<String, String> segmentNameGeneratorConfigs = _taskSpec.getSegmentNameGeneratorSpec().getConfigs(); + SegmentNameGenerator segmentNameGenerator; + switch (segmentNameGeneratorType) { + case SIMPLE_SEGMENT_NAME_GENERATOR: + segmentNameGenerator = + new SimpleSegmentNameGenerator(tableName, segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX)); + break; + case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR: + Preconditions.checkState(tableConfig != null, + "In order to use NormalizedDateSegmentNameGenerator, table config must be provided"); + SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); + String timeFormat = null; + TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec(); + if (timeFieldSpec != null) { + timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat(); + } + segmentNameGenerator = + new NormalizedDateSegmentNameGenerator(tableName, segmentNameGeneratorConfigs.get(SEGMENT_NAME_PREFIX), + Boolean.valueOf(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)), + validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(), + validationConfig.getTimeType(), timeFormat); + break; + default: + throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType); + } + + //init segment generation config + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); + segmentGeneratorConfig.setTableName(tableName); + segmentGeneratorConfig.setOutDir(_taskSpec.getOutputDirectoryPath()); + segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator); + segmentGeneratorConfig.setSequenceId(_taskSpec.getSequenceId()); + + //build segment + SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new SegmentIndexCreationDriverImpl(); + segmentIndexCreationDriver.init(segmentGeneratorConfig, recordReader); + segmentIndexCreationDriver.build(); + return segmentIndexCreationDriver.getSegmentName(); + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java new file mode 100644 index 0000000..04fdbf3 --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +import java.net.URI; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.RecordReader; + + +public class SegmentGenerationTaskSpec { + + TableConfig _tableConfig; + + Schema _schema; + + RecordReaderSpec _recordReaderSpec; + + SegmentNameGeneratorSpec _segmentNameGeneratorSpec; + + String _inputFilePath; + + String _outputDirectoryPath; + + int _sequenceId; + + public TableConfig getTableConfig() { + return _tableConfig; + } + + public void setTableConfig(TableConfig tableConfig) { + _tableConfig = tableConfig; + } + + public Schema getSchema() { + return _schema; + } + + public void setSchema(Schema schema) { + _schema = schema; + } + + public RecordReaderSpec getRecordReaderSpec() { + return _recordReaderSpec; + } + + public void setRecordReaderSpec(RecordReaderSpec recordReaderSpec) { + _recordReaderSpec = recordReaderSpec; + } + + public SegmentNameGeneratorSpec getSegmentNameGeneratorSpec() { + return _segmentNameGeneratorSpec; + } + + public void setSegmentNameGeneratorSpec(SegmentNameGeneratorSpec segmentNameGeneratorSpec) { + _segmentNameGeneratorSpec = segmentNameGeneratorSpec; + } + + public String getInputFilePath() { + return _inputFilePath; + } + + public void setInputFilePath(String inputFilePath) { + _inputFilePath = inputFilePath; + } + + public String getOutputDirectoryPath() { + return _outputDirectoryPath; + } + + public void setOutputDirectoryPath(String outputDirectoryPath) { + _outputDirectoryPath = outputDirectoryPath; + } + + public int getSequenceId() { + return _sequenceId; + } + + public void setSequenceId(int sequenceId) { + _sequenceId = sequenceId; + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java new file mode 100644 index 0000000..0b1c215 --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +import java.util.Map; + + +public class SegmentNameGeneratorSpec { + + String _type; + + Map<String, String> _configs; + + public String getType() { + return _type; + } + + public void setType(String type) { + _type = type; + } + + public Map<String, String> getConfigs() { + return _configs; + } + + public void setConfigs(Map<String, String> configs) { + _configs = configs; + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java new file mode 100644 index 0000000..ae2745e --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.common; + +public class TableSpec { + + String _tableName; + + String _schemaURI; + + String _tableConfigURI; + + public String getTableName() { + return _tableName; + } + + public void setTableName(String tableName) { + _tableName = tableName; + } + + public String getSchemaURI() { + return _schemaURI; + } + + public void setSchemaURI(String schemaURI) { + _schemaURI = schemaURI; + } + + public String getTableConfigURI() { + return _tableConfigURI; + } + + public void setTableConfigURI(String tableConfigURI) { + _tableConfigURI = tableConfigURI; + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java new file mode 100644 index 0000000..d6127bc --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.standalone; + +import java.io.File; +import java.io.FilenameFilter; +import java.net.URI; +import java.util.List; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.filesystem.PinotFSFactory; +import org.apache.pinot.ingestion.common.PinotFSSpec; +import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec; +import org.apache.pinot.ingestion.common.SegmentGenerationTaskRunner; +import org.apache.pinot.ingestion.common.SegmentGenerationTaskSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; + + +public class SegmentGenerationJobRunner { + + private SegmentGenerationJobSpec _spec; + + public SegmentGenerationJobRunner(SegmentGenerationJobSpec spec) { + + _spec = spec; + } + + public void run() + throws Exception { + //init all file systems + List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); + for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { + Configuration config = new MapConfiguration(pinotFSSpec.getConfigs()); + PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), config); + } + + //Get pinotFS for input + URI inputDirURI = new URI(_spec.getInputDirURI()); + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + + //Get outputFS for writing output pinot segments + URI outputDirURI = new URI(_spec.getOutputDirURI()); + PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); + + //Get list of files to process + String[] files = inputDirFS.listFiles(inputDirURI, true); + //TODO: sort input files based on creation time + //TODO: handle input file name filters + + //create tempDirectory for input and output + File tempDirectory = FileUtils.getTempDirectory(); + File localInputDir = new File(tempDirectory, "input"); + FileUtils.forceMkdir(localInputDir); + File localOutputTempDirectory = new File(tempDirectory, "output"); + FileUtils.forceMkdir(localOutputTempDirectory); + + //Read TableConfig, Schema + String schemaJson = IOUtils.toString(new URI(_spec.getTableSpec().getSchemaURI()), "UTF-8"); + Schema schema = Schema.fromString(schemaJson); + String tableConfigJson = IOUtils.toString(new URI(_spec.getTableSpec().getTableConfigURI()), "UTF-8"); + TableConfig tableConfig = TableConfig.fromJsonString(tableConfigJson); + //iterate on the file list, for each + for (int i = 0; i < files.length; i++) { + //copy input path to local + File inputDataFile = new File(localInputDir, new File(files[i]).getName()); + inputDirFS.copyToLocalFile(new URI(files[i]), inputDataFile); + + //create taskspec + SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); + taskSpec.setInputFilePath(inputDataFile.getAbsolutePath()); + taskSpec.setOutputDirectoryPath(localOutputTempDirectory.getAbsolutePath()); + taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); + taskSpec.setSchema(schema); + taskSpec.setTableConfig(tableConfig); + taskSpec.setSequenceId(i); + taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + + //invoke segmentGenerationTask + SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); + String segmentName = taskRunner.run(); + + //move segment to output PinotFS + + File outputSegmentFile = new File(localOutputTempDirectory, segmentName + ".tar.gz"); + outputDirFS.copyFromLocalFile(outputSegmentFile, outputDirURI); + + FileUtils.deleteQuietly(outputSegmentFile); + FileUtils.deleteQuietly(inputDataFile); + } + //clean up + FileUtils.deleteDirectory(tempDirectory); + } +} diff --git a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java new file mode 100644 index 0000000..b1d3741 --- /dev/null +++ b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.standalone; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.Reader; +import java.io.StringWriter; +import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec; +import org.yaml.snakeyaml.Yaml; + + +public class StandaloneIngestionJobLauncher { + + + public static void main(String[] args) throws Exception{ + String jobSpecFilePath = args[0]; + + try(Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) { + + Yaml yaml = new Yaml(); + SegmentGenerationJobSpec spec = yaml.loadAs(reader, SegmentGenerationJobSpec.class); + StringWriter sw = new StringWriter(); + yaml.dump(spec, sw); + System.out.println("dump = " + sw.toString()); + + } + } +} diff --git a/pinot-ingestion-jobs/pom.xml b/pinot-ingestion-jobs/pom.xml index b89a8f6..d9cffb3 100644 --- a/pinot-ingestion-jobs/pom.xml +++ b/pinot-ingestion-jobs/pom.xml @@ -41,6 +41,7 @@ <module>pinot-ingestion-common</module> <module>pinot-hadoop</module> <module>pinot-spark</module> + <module>pinot-standalone</module> </modules> <dependencies> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
