This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch template_ingestion_spec in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ecd43a9e30720e7c9f67aee4243fdb619a55b42d Author: Xiang Fu <[email protected]> AuthorDate: Wed May 6 03:38:59 2020 -0700 Adding template support for Pinot Ingestion Job Spec --- pinot-spi/pom.xml | 4 + .../spi/ingestion/batch/IngestionJobLauncher.java | 22 ++++-- .../apache/pinot/spi/utils/JinjaTemplateUtils.java | 66 +++++++++++++++++ .../ingestion/batch/IngestionJobLauncherTest.java | 42 +++++++++++ .../pinot/spi/utils/JinjaTemplateUtilsTest.java | 85 ++++++++++++++++++++++ .../test/resources/ingestionJobSpecTemplate.yaml | 45 ++++++++++++ .../command/LaunchDataIngestionJobCommand.java | 19 ++++- pom.xml | 6 ++ 8 files changed, 283 insertions(+), 6 deletions(-) diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml index 8ad0aec..b1a4b8b 100644 --- a/pinot-spi/pom.xml +++ b/pinot-spi/pom.xml @@ -100,6 +100,10 @@ <artifactId>jsr305</artifactId> </dependency> <dependency> + <groupId>com.hubspot.jinjava</groupId> + <artifactId>jinjava</artifactId> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </dependency> diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java index 9bb740a..c3ae5ea 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java @@ -20,13 +20,17 @@ package org.apache.pinot.spi.ingestion.batch; import java.io.BufferedReader; import java.io.FileReader; -import java.io.Reader; +import java.io.IOException; import java.io.StringWriter; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.utils.JinjaTemplateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -49,11 +53,19 @@ public class IngestionJobLauncher { System.exit(1); } String jobSpecFilePath = args[0]; + List<String> valueList = Arrays.asList(args); + valueList.remove(0); + SegmentGenerationJobSpec spec = + getSegmentGenerationJobSpec(jobSpecFilePath, JinjaTemplateUtils.getTemplateContext(valueList)); + runIngestionJob(spec); + } - try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) { - SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); - runIngestionJob(spec); - } + public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String jobSpecFilePath, + Map<String, Object> context) + throws IOException { + String yamlTemplate = IOUtils.toString(new BufferedReader(new FileReader(jobSpecFilePath))); + String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context); + return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class); } public static void runIngestionJob(SegmentGenerationJobSpec spec) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java new file mode 100644 index 0000000..76be359 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils; + +import com.fasterxml.jackson.databind.util.StdDateFormat; +import com.hubspot.jinjava.Jinjava; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class JinjaTemplateUtils { + + private static final Jinjava JINJAVA = new Jinjava(); + private static final StdDateFormat STD_DATE_FORMAT = new StdDateFormat(); + + public static String renderTemplate(String template, Map<String, Object> newContext) { + Map<String, Object> contextMap = getDefaultJinjaContextMap(); + contextMap.putAll(newContext); + return JINJAVA.render(template, contextMap); + } + + public static Map<String, Object> getDefaultJinjaContextMap() { + Map<String, Object> defaultJinjaContextMap = new HashMap<>(); + defaultJinjaContextMap.put("now", Instant.now()); + defaultJinjaContextMap.put("today", STD_DATE_FORMAT.format(new Date(Instant.now().toEpochMilli()))); + defaultJinjaContextMap + .put("yesterday", STD_DATE_FORMAT.format(new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli()))); + return defaultJinjaContextMap; + } + + public static Map<String, Object> getTemplateContext(List<String> values) { + Map<String, Object> context = new HashMap<>(); + for (String value : values) { + String[] splits = value.split("=", 2); + if (splits.length > 1) { + context.put(splits[0], splits[1]); + } + } + return context; + } + + public static String renderTemplate(String template) { + return renderTemplate(template, Collections.emptyMap()); + } +} diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java new file mode 100644 index 0000000..7d54f0f --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.ingestion.batch; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.JinjaTemplateUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class IngestionJobLauncherTest { + + @Test + public void testIngestionJobLauncherWithTemplate() + throws IOException { + Map<String, Object> context = + JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06")); + SegmentGenerationJobSpec spec = IngestionJobLauncher.getSegmentGenerationJobSpec( + JinjaTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(), context); + Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06"); + Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06"); + } +} diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java new file mode 100644 index 0000000..8e13aed --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; +import org.apache.commons.io.IOUtils; +import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.testng.Assert; +import org.testng.annotations.Test; +import org.yaml.snakeyaml.Yaml; + + +public class JinjaTemplateUtilsTest { + + @Test + public void testDefaultRenderTemplate() { + Date today = new Date(Instant.now().toEpochMilli()); + Date yesterday = new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli()); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:"); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ today }}").substring(0, 17), dateFormat.format(today)); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ yesterday }}").substring(0, 17), + dateFormat.format(yesterday)); + } + + @Test + public void testRenderTemplateWithGivenContextMap() { + Map<String, Object> contextMap = new HashMap<>(); + contextMap.put("first_date_2020", "2020-01-01"); + contextMap.put("name", "xiang"); + contextMap.put("ts", 1577836800); + contextMap.put("yyyy", "2020"); + contextMap.put("MM", "05"); + contextMap.put("dd", "06"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ first_date_2020 }}", contextMap), "2020-01-01"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{first_date_2020}}", contextMap), "2020-01-01"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name }}", contextMap), "xiang"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name|upper }}", contextMap), "XIANG"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ ts }}", contextMap), "1577836800"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{ yyyy }}/{{ MM }}/{{ dd }}", contextMap), + "/var/rawdata/2020/05/06"); + Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{yyyy}}/{{MM}}/{{dd}}", contextMap), + "/var/rawdata/2020/05/06"); + } + + @Test + public void testIngestionJobTemplate() + throws IOException { + InputStream resourceAsStream = + JinjaTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml"); + String yamlTemplate = IOUtils.toString(resourceAsStream); + Map<String, Object> context = + JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06")); + String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context); + SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class); + Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06"); + Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06"); + } +} diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml new file mode 100644 index 0000000..5032e17 --- /dev/null +++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner' +jobType: SegmentCreationAndTarPush +inputDirURI: 'file:///path/to/input/{{ year }}/{{ month }}/{{ day }}' +includeFileNamePattern: 'glob:**/*.parquet' +excludeFileNamePattern: 'glob:**/*.avro' +outputDirURI: 'file:///path/to/output/{{year}}/{{month}}/{{day}}' +overwriteOutput: true +pinotFSSpecs: + - scheme: file + className: org.apache.pinot.spi.filesystem.LocalPinotFS +recordReaderSpec: + dataFormat: 'parquet' + className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader' +tableSpec: + tableName: 'myTable' + schemaURI: 'http://localhost:9000/tables/myTable/schema' + tableConfigURI: 'http://localhost:9000/tables/myTable' +pinotClusterSpecs: + - controllerURI: 'localhost:9000' +pushJobSpec: + pushAttempts: 2 + pushRetryIntervalMillis: 1000 \ No newline at end of file diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java index 49c31b2..3c491fc 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java @@ -18,11 +18,22 @@ */ package org.apache.pinot.tools.admin.command; +import java.io.BufferedReader; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.JinjaTemplateUtils; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; +import org.kohsuke.args4j.spi.StringArrayOptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; /** @@ -35,6 +46,9 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl @Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage = "Ingestion job spec file") private String _jobSpecFile; + @Option(name = "-values", required = true, metaVar = "<list<string>>", handler = StringArrayOptionHandler.class, usage = "values set in the job spec template") + private List<String> _values; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -47,7 +61,10 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl public boolean execute() throws Exception { try { - IngestionJobLauncher.main(new String[]{_jobSpecFile}); + List<String> arguments = new ArrayList(); + arguments.add(_jobSpecFile); + arguments.addAll(_values); + IngestionJobLauncher.main(arguments.toArray(new String[0])); } catch (Exception e) { LOGGER.error("Got exception to kick off standalone data ingestion job -", e); throw e; diff --git a/pom.xml b/pom.xml index d1494f3..846e803 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.11</scala.version> <antlr.version>4.6</antlr.version> + <jinjava.version>2.5.3</jinjava.version> <calcite.version>1.19.0</calcite.version> <lucene.version>8.2.0</lucene.version> <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang --> @@ -962,6 +963,11 @@ </exclusions> </dependency> <dependency> + <groupId>com.hubspot.jinjava</groupId> + <artifactId>jinjava</artifactId> + <version>${jinjava.version}</version> + </dependency> + <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-babel</artifactId> <version>${calcite.version}</version> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
