This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch template_ingestion_spec in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 42a1b6f575f7b1749cee27947ef5bdf631145e3d Author: Xiang Fu <[email protected]> AuthorDate: Tue May 12 02:52:25 2020 -0700 Re-work on adding template support for Pinot Ingestion Job Spec --- .../spi/ingestion/batch/IngestionJobLauncher.java | 29 ++++++-- .../pinot/spi/utils/GroovyTemplateUtils.java | 78 +++++++++++++++++++ .../ingestion/batch/IngestionJobLauncherTest.java | 42 +++++++++++ .../pinot/spi/utils/GroovyTemplateUtilsTest.java | 87 ++++++++++++++++++++++ .../test/resources/ingestionJobSpecTemplate.yaml | 45 +++++++++++ .../command/LaunchDataIngestionJobCommand.java | 16 +++- 6 files changed, 287 insertions(+), 10 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java index 9bb740a..03cef2f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java @@ -20,13 +20,18 @@ package org.apache.pinot.spi.ingestion.batch; import java.io.BufferedReader; import java.io.FileReader; -import java.io.Reader; +import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.utils.GroovyTemplateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -36,7 +41,7 @@ public class IngestionJobLauncher { public static final Logger LOGGER = LoggerFactory.getLogger(IngestionJobLauncher.class); - private static final String USAGE = "usage: [jobSpec.yaml]"; + private static final String USAGE = "usage: [jobSpec.yaml] [template_key=template_value]..."; private static void usage() { System.err.println(USAGE); @@ -44,16 +49,26 @@ public class IngestionJobLauncher { public static void main(String[] args) throws Exception { - if (args.length != 1) { + if (args.length < 1) { usage(); System.exit(1); } String jobSpecFilePath = args[0]; - - try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) { - SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); - runIngestionJob(spec); + List<String> valueList = new ArrayList<>(); + for (int i = 1; i < args.length; i++) { + valueList.add(args[i]); } + SegmentGenerationJobSpec spec = + getSegmentGenerationJobSpec(jobSpecFilePath, GroovyTemplateUtils.getTemplateContext(valueList)); + runIngestionJob(spec); + } + + public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String jobSpecFilePath, + Map<String, Object> context) + throws IOException, ClassNotFoundException { + String yamlTemplate = IOUtils.toString(new BufferedReader(new FileReader(jobSpecFilePath))); + String yamlStr = GroovyTemplateUtils.renderTemplate(yamlTemplate, context); + return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class); } public static void runIngestionJob(SegmentGenerationJobSpec spec) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/GroovyTemplateUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/GroovyTemplateUtils.java new file mode 100644 index 0000000..5319b1d --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/GroovyTemplateUtils.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils; + +import groovy.text.SimpleTemplateEngine; +import groovy.text.TemplateEngine; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + + +public class GroovyTemplateUtils { + private static final TemplateEngine GROOVY_TEMPLATE_ENGINE = new SimpleTemplateEngine(); + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd"); + + public static String renderTemplate(String template, Map<String, Object> newContext) + throws IOException, ClassNotFoundException { + Map<String, Object> contextMap = getDefaultContextMap(); + contextMap.putAll(newContext); + return GROOVY_TEMPLATE_ENGINE.createTemplate(template).make(contextMap).toString(); + } + + /** + Construct default template context: + today : today's date in format `yyyy-MM-dd`, example value: '2020-05-06' + yesterday : yesterday's date in format `yyyy-MM-dd`, example value: '2020-05-06' + */ + public static Map<String, Object> getDefaultContextMap() { + Map<String, Object> defaultContextMap = new HashMap<>(); + Instant now = Instant.now(); + defaultContextMap.put("today", DATE_FORMAT.format(new Date(now.toEpochMilli()))); + defaultContextMap.put("yesterday", DATE_FORMAT.format(new Date(now.minus(1, ChronoUnit.DAYS).toEpochMilli()))); + return defaultContextMap; + } + + public static Map<String, Object> getTemplateContext(List<String> values) { + Map<String, Object> context = new HashMap<>(); + for (String value : values) { + String[] splits = value.split("=", 2); + if (splits.length > 1) { + context.put(splits[0], splits[1]); + } + } + return context; + } + + public static String renderTemplate(String template) + throws IOException, ClassNotFoundException { + return renderTemplate(template, Collections.emptyMap()); + } + + static { + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); + } +} diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java new file mode 100644 index 0000000..5d98c06 --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.ingestion.batch; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.GroovyTemplateUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class IngestionJobLauncherTest { + + @Test + public void testIngestionJobLauncherWithTemplate() + throws IOException, ClassNotFoundException { + Map<String, Object> context = + GroovyTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06")); + SegmentGenerationJobSpec spec = IngestionJobLauncher.getSegmentGenerationJobSpec( + GroovyTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(), context); + Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06"); + Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06"); + } +} diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/GroovyTemplateUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/GroovyTemplateUtilsTest.java new file mode 100644 index 0000000..4719db6 --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/GroovyTemplateUtilsTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; +import org.apache.commons.io.IOUtils; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.testng.Assert; +import org.testng.annotations.Test; +import org.yaml.snakeyaml.Yaml; + + +public class GroovyTemplateUtilsTest { + + @Test + public void testDefaultRenderTemplate() + throws IOException, ClassNotFoundException { + Date today = new Date(Instant.now().toEpochMilli()); + Date yesterday = new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli()); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ today }"), dateFormat.format(today)); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ yesterday }"), dateFormat.format(yesterday)); + } + + @Test + public void testRenderTemplateWithGivenContextMap() + throws IOException, ClassNotFoundException { + Map<String, Object> contextMap = new HashMap<>(); + contextMap.put("first_date_2020", "2020-01-01"); + contextMap.put("name", "xiang"); + contextMap.put("ts", 1577836800); + contextMap.put("yyyy", "2020"); + contextMap.put("YYYY", "1919"); + contextMap.put("MM", "05"); + contextMap.put("dd", "06"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("$first_date_2020", contextMap), "2020-01-01"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${first_date_2020}", contextMap), "2020-01-01"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ name }", contextMap), "xiang"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ ts }", contextMap), "1577836800"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("/var/rawdata/${ yyyy }/${ MM }/${ dd }", contextMap), + "/var/rawdata/2020/05/06"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("/var/rawdata/${yyyy}/${MM}/${dd}", contextMap), + "/var/rawdata/2020/05/06"); + Assert.assertEquals(GroovyTemplateUtils.renderTemplate("/var/rawdata/${YYYY}/${MM}/${dd}", contextMap), + "/var/rawdata/1919/05/06"); + } + + @Test + public void testIngestionJobTemplate() + throws IOException, ClassNotFoundException { + InputStream resourceAsStream = + GroovyTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml"); + String yamlTemplate = IOUtils.toString(resourceAsStream); + Map<String, Object> context = + GroovyTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06")); + String yamlStr = GroovyTemplateUtils.renderTemplate(yamlTemplate, context); + SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class); + Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06"); + Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06"); + } +} diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml new file mode 100644 index 0000000..c20e88a --- /dev/null +++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner' +jobType: SegmentCreationAndTarPush +inputDirURI: 'file:///path/to/input/${ year }/${ month }/${ day }' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output/${year}/${month}/${day}' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' +pushJobSpec: + pushAttempts: 2 + pushRetryIntervalMillis: 1000 \ No newline at end of file diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java index 49c31b2..bd37ebb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java @@ -18,9 +18,13 @@ */ package org.apache.pinot.tools.admin.command; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; +import org.kohsuke.args4j.spi.StringArrayOptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +39,9 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl @Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage = "Ingestion job spec file") private String _jobSpecFile; + @Option(name = "-values", required = false, metaVar = "<template context>", handler = StringArrayOptionHandler.class, usage = "Context values set to the job spec template") + private List<String> _values; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -47,9 +54,12 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl public boolean execute() throws Exception { try { - IngestionJobLauncher.main(new String[]{_jobSpecFile}); + List<String> arguments = new ArrayList(); + arguments.add(_jobSpecFile); + arguments.addAll(_values); + IngestionJobLauncher.main(arguments.toArray(new String[0])); } catch (Exception e) { - LOGGER.error("Got exception to kick off standalone data ingestion job -", e); + LOGGER.error("Got exception to kick off standalone data ingestion job - ", e); throw e; } return true; @@ -62,7 +72,7 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl @Override public String toString() { - return ("LaunchDataIngestionJob -jobSpecFile " + _jobSpecFile); + return ("LaunchDataIngestionJob -jobSpecFile " + _jobSpecFile + " -values " + Arrays.toString(_values.toArray())); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
