This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 42a1b6f575f7b1749cee27947ef5bdf631145e3d
Author: Xiang Fu <[email protected]>
AuthorDate: Tue May 12 02:52:25 2020 -0700

    Re-work on adding template support for Pinot Ingestion Job Spec
---
 .../spi/ingestion/batch/IngestionJobLauncher.java  | 29 ++++++--
 .../pinot/spi/utils/GroovyTemplateUtils.java       | 78 +++++++++++++++++++
 .../ingestion/batch/IngestionJobLauncherTest.java  | 42 +++++++++++
 .../pinot/spi/utils/GroovyTemplateUtilsTest.java   | 87 ++++++++++++++++++++++
 .../test/resources/ingestionJobSpecTemplate.yaml   | 45 +++++++++++
 .../command/LaunchDataIngestionJobCommand.java     | 16 +++-
 6 files changed, 287 insertions(+), 10 deletions(-)

diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9bb740a..03cef2f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -20,13 +20,18 @@ package org.apache.pinot.spi.ingestion.batch;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.Reader;
+import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.GroovyTemplateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -36,7 +41,7 @@ public class IngestionJobLauncher {
 
   public static final Logger LOGGER = 
LoggerFactory.getLogger(IngestionJobLauncher.class);
 
-  private static final String USAGE = "usage: [jobSpec.yaml]";
+  private static final String USAGE = "usage: [jobSpec.yaml] 
[template_key=template_value]...";
 
   private static void usage() {
     System.err.println(USAGE);
@@ -44,16 +49,26 @@ public class IngestionJobLauncher {
 
   public static void main(String[] args)
       throws Exception {
-    if (args.length != 1) {
+    if (args.length < 1) {
       usage();
       System.exit(1);
     }
     String jobSpecFilePath = args[0];
-
-    try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
-      SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, 
SegmentGenerationJobSpec.class);
-      runIngestionJob(spec);
+    List<String> valueList = new ArrayList<>();
+    for (int i = 1; i < args.length; i++) {
+      valueList.add(args[i]);
     }
+    SegmentGenerationJobSpec spec =
+        getSegmentGenerationJobSpec(jobSpecFilePath, 
GroovyTemplateUtils.getTemplateContext(valueList));
+    runIngestionJob(spec);
+  }
+
+  public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String 
jobSpecFilePath,
+      Map<String, Object> context)
+      throws IOException, ClassNotFoundException {
+    String yamlTemplate = IOUtils.toString(new BufferedReader(new 
FileReader(jobSpecFilePath)));
+    String yamlStr = GroovyTemplateUtils.renderTemplate(yamlTemplate, context);
+    return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
   }
 
   public static void runIngestionJob(SegmentGenerationJobSpec spec)
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/GroovyTemplateUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/GroovyTemplateUtils.java
new file mode 100644
index 0000000..5319b1d
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/GroovyTemplateUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import groovy.text.SimpleTemplateEngine;
+import groovy.text.TemplateEngine;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+
+public class GroovyTemplateUtils {
+  private static final TemplateEngine GROOVY_TEMPLATE_ENGINE = new 
SimpleTemplateEngine();
+  private static final SimpleDateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd");
+
+  public static String renderTemplate(String template, Map<String, Object> 
newContext)
+      throws IOException, ClassNotFoundException {
+    Map<String, Object> contextMap = getDefaultContextMap();
+    contextMap.putAll(newContext);
+    return 
GROOVY_TEMPLATE_ENGINE.createTemplate(template).make(contextMap).toString();
+  }
+
+  /**
+   Construct default template context:
+   today : today's date in format `yyyy-MM-dd`, example value: '2020-05-06'
+   yesterday : yesterday's date in format `yyyy-MM-dd`, example value: 
'2020-05-06'
+   */
+  public static Map<String, Object> getDefaultContextMap() {
+    Map<String, Object> defaultContextMap = new HashMap<>();
+    Instant now = Instant.now();
+    defaultContextMap.put("today", DATE_FORMAT.format(new 
Date(now.toEpochMilli())));
+    defaultContextMap.put("yesterday", DATE_FORMAT.format(new 
Date(now.minus(1, ChronoUnit.DAYS).toEpochMilli())));
+    return defaultContextMap;
+  }
+
+  public static Map<String, Object> getTemplateContext(List<String> values) {
+    Map<String, Object> context = new HashMap<>();
+    for (String value : values) {
+      String[] splits = value.split("=", 2);
+      if (splits.length > 1) {
+        context.put(splits[0], splits[1]);
+      }
+    }
+    return context;
+  }
+
+  public static String renderTemplate(String template)
+      throws IOException, ClassNotFoundException {
+    return renderTemplate(template, Collections.emptyMap());
+  }
+
+  static {
+    DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+}
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
new file mode 100644
index 0000000..5d98c06
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.batch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.GroovyTemplateUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IngestionJobLauncherTest {
+
+  @Test
+  public void testIngestionJobLauncherWithTemplate()
+      throws IOException, ClassNotFoundException {
+    Map<String, Object> context =
+        GroovyTemplateUtils.getTemplateContext(Arrays.asList("year=2020", 
"month=05", "day=06"));
+    SegmentGenerationJobSpec spec = 
IngestionJobLauncher.getSegmentGenerationJobSpec(
+        
GroovyTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(),
 context);
+    Assert.assertEquals(spec.getInputDirURI(), 
"file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), 
"file:///path/to/output/2020/05/06");
+  }
+}
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/GroovyTemplateUtilsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/GroovyTemplateUtilsTest.java
new file mode 100644
index 0000000..4719db6
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/GroovyTemplateUtilsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class GroovyTemplateUtilsTest {
+
+  @Test
+  public void testDefaultRenderTemplate()
+      throws IOException, ClassNotFoundException {
+    Date today = new Date(Instant.now().toEpochMilli());
+    Date yesterday = new Date(Instant.now().minus(1, 
ChronoUnit.DAYS).toEpochMilli());
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ today }"), 
dateFormat.format(today));
+    Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ yesterday }"), 
dateFormat.format(yesterday));
+  }
+
+  @Test
+  public void testRenderTemplateWithGivenContextMap()
+      throws IOException, ClassNotFoundException {
+    Map<String, Object> contextMap = new HashMap<>();
+    contextMap.put("first_date_2020", "2020-01-01");
+    contextMap.put("name", "xiang");
+    contextMap.put("ts", 1577836800);
+    contextMap.put("yyyy", "2020");
+    contextMap.put("YYYY", "1919");
+    contextMap.put("MM", "05");
+    contextMap.put("dd", "06");
+    Assert.assertEquals(GroovyTemplateUtils.renderTemplate("$first_date_2020", 
contextMap), "2020-01-01");
+    
Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${first_date_2020}", 
contextMap), "2020-01-01");
+    Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ name }", 
contextMap), "xiang");
+    Assert.assertEquals(GroovyTemplateUtils.renderTemplate("${ ts }", 
contextMap), "1577836800");
+    Assert.assertEquals(GroovyTemplateUtils.renderTemplate("/var/rawdata/${ 
yyyy }/${ MM }/${ dd }", contextMap),
+        "/var/rawdata/2020/05/06");
+    
Assert.assertEquals(GroovyTemplateUtils.renderTemplate("/var/rawdata/${yyyy}/${MM}/${dd}",
 contextMap),
+        "/var/rawdata/2020/05/06");
+    
Assert.assertEquals(GroovyTemplateUtils.renderTemplate("/var/rawdata/${YYYY}/${MM}/${dd}",
 contextMap),
+        "/var/rawdata/1919/05/06");
+  }
+
+  @Test
+  public void testIngestionJobTemplate()
+      throws IOException, ClassNotFoundException {
+    InputStream resourceAsStream =
+        
GroovyTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml");
+    String yamlTemplate = IOUtils.toString(resourceAsStream);
+    Map<String, Object> context =
+        GroovyTemplateUtils.getTemplateContext(Arrays.asList("year=2020", 
"month=05", "day=06"));
+    String yamlStr = GroovyTemplateUtils.renderTemplate(yamlTemplate, context);
+    SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, 
SegmentGenerationJobSpec.class);
+    Assert.assertEquals(spec.getInputDirURI(), 
"file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), 
"file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml 
b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
new file mode 100644
index 0000000..c20e88a
--- /dev/null
+++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+jobType: SegmentCreationAndTarPush
+inputDirURI: 'file:///path/to/input/${ year }/${ month }/${ day }'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output/${year}/${month}/${day}'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  pushAttempts: 2
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 49c31b2..bd37ebb 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +39,9 @@ public class LaunchDataIngestionJobCommand extends 
AbstractBaseAdminCommand impl
   @Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage 
= "Ingestion job spec file")
   private String _jobSpecFile;
 
+  @Option(name = "-values", required = false, metaVar = "<template context>", 
handler = StringArrayOptionHandler.class, usage = "Context values set to the 
job spec template")
+  private List<String> _values;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", 
"--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -47,9 +54,12 @@ public class LaunchDataIngestionJobCommand extends 
AbstractBaseAdminCommand impl
   public boolean execute()
       throws Exception {
     try {
-      IngestionJobLauncher.main(new String[]{_jobSpecFile});
+      List<String> arguments = new ArrayList();
+      arguments.add(_jobSpecFile);
+      arguments.addAll(_values);
+      IngestionJobLauncher.main(arguments.toArray(new String[0]));
     } catch (Exception e) {
-      LOGGER.error("Got exception to kick off standalone data ingestion job 
-", e);
+      LOGGER.error("Got exception to kick off standalone data ingestion job - 
", e);
       throw e;
     }
     return true;
@@ -62,7 +72,7 @@ public class LaunchDataIngestionJobCommand extends 
AbstractBaseAdminCommand impl
 
   @Override
   public String toString() {
-    return ("LaunchDataIngestionJob -jobSpecFile " + _jobSpecFile);
+    return ("LaunchDataIngestionJob -jobSpecFile " + _jobSpecFile + " -values 
" + Arrays.toString(_values.toArray()));
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to