This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit 6c09b6e2650cbff11b5e53129dcf35c686f289d5 Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 08:59:36 2023 +0200 STORM-3988 - Remove "storm-pmml" --- examples/storm-pmml-examples/pom.xml | 96 -------- .../apache/storm/pmml/JpmmlRunnerTestTopology.java | 196 ---------------- .../apache/storm/pmml/RawInputFromCSVSpout.java | 133 ----------- .../src/main/resources/Audit.50.csv | 51 ---- ...KNIME_PMML_4.1_Examples_single_audit_logreg.xml | 259 --------------------- .../src/main/resources/README.md | 6 - external/storm-pmml/README.md | 61 ----- external/storm-pmml/pom.xml | 103 -------- .../org/apache/storm/pmml/PMMLPredictorBolt.java | 106 --------- .../org/apache/storm/pmml/model/ModelOutputs.java | 47 ---- .../storm/pmml/model/jpmml/JpmmlModelOutputs.java | 156 ------------- .../org/apache/storm/pmml/runner/ModelRunner.java | 39 ---- .../storm/pmml/runner/ModelRunnerFactory.java | 25 -- .../apache/storm/pmml/runner/PmmlModelRunner.java | 56 ----- .../storm/pmml/runner/jpmml/JPmmlModelRunner.java | 156 ------------- .../storm/pmml/runner/jpmml/JpmmlFactory.java | 229 ------------------ pom.xml | 2 - 17 files changed, 1721 deletions(-) diff --git a/examples/storm-pmml-examples/pom.xml b/examples/storm-pmml-examples/pom.xml deleted file mode 100644 index afced750e..000000000 --- a/examples/storm-pmml-examples/pom.xml +++ /dev/null @@ -1,96 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>storm-pmml-examples</artifactId> - <name>storm-pmml-examples</name> - - <properties> - <!-- Required downgrade by pmml-evaluator 1.0.22 --> - <guava.version>16.0.1</guava.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-pmml</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.storm.pmml.JpmmlRunnerTestTopology</mainClass> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java deleted file mode 100644 index 95dff1590..000000000 --- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml; - -import com.google.common.collect.Lists; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.compress.utils.IOUtils; -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.pmml.model.ModelOutputs; -import org.apache.storm.pmml.model.jpmml.JpmmlModelOutputs; -import org.apache.storm.pmml.runner.jpmml.JpmmlFactory; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.Utils; - -/** - * Topology that loads a PMML Model and raw input data from a CSV file. The {@link RawInputFromCSVSpout} - * creates a stream of tuples with raw inputs, and the {@link PMMLPredictorBolt} computes the predicted scores. - * - * <p>The location of the PMML Model and CSV files can be specified as CLI argument. Alternatively, the PMML Model can also - * be uploaded to the Blobstore and used in the topology specifying the blobKey. If no arguments are given, - * it loads the default example as described in the README file - */ -public class JpmmlRunnerTestTopology { - private static final String PMML_MODEL_FILE = "KNIME_PMML_4.1_Examples_single_audit_logreg.xml"; - private static final String RAW_INPUTS_FILE = "Audit.50.csv"; - - private static final String RAW_INPUT_FROM_CSV_SPOUT = "rawInputFromCsvSpout"; - private static final String PMML_PREDICTOR_BOLT = "pmmLPredictorBolt"; - private static final String PRINT_BOLT_1 = "printBolt1"; - private static final String PRINT_BOLT_2 = "printBolt2"; - private static final String NON_DEFAULT_STREAM_ID = "NON_DEFAULT_STREAM_ID"; - - private File rawInputs; // Raw input data to be scored (predicted) - private File pmml; // PMML Model read from file - null if using Blobstore - private String blobKey; // PMML Model downloaded from Blobstore - null if using File - private String tplgyName = "test"; - - public static void main(String[] args) throws Exception { - try { - JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology(); - testTopology.parseArgs(args); - testTopology.run(); - } catch (Exception e) { - e.printStackTrace(); - printUsage(); - } - } - - private void parseArgs(String[] args) { - if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) { - printUsage(); - } else if (Arrays.stream(args).anyMatch(option -> option.equals("-f")) - && Arrays.stream(args).anyMatch(option -> option.equals("-b"))) { - System.out.println("Please specify only one option of [-b, -f]"); - printUsage(); - } else { - try { - for (int i = 0; i < args.length; ) { - switch (args[i]) { - case "-f": - pmml = new File(args[i + 1]); - i += 2; - break; - case "-b": - blobKey = args[i + 1]; - i += 2; - break; - case "-r": - rawInputs = new File(args[i + 1]); - i += 2; - break; - default: - tplgyName = args[i]; - i++; - break; - } - } - setDefaults(); - } catch (Exception e) { - e.printStackTrace(); - printUsage(); - } - } - } - - private void setDefaults() { - if (blobKey == null) { // blob key not specified, use file - if (pmml == null) { - pmml = loadExample(pmml, PMML_MODEL_FILE); - } - } - - if (rawInputs == null) { - rawInputs = loadExample(rawInputs, RAW_INPUTS_FILE); - } - - if (tplgyName == null) { - tplgyName = "pmmlPredictorLocal"; - } - } - - private File loadExample(File file, String example) { - try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) { - file = Files.createTempFile("pmml-example", ".tmp").toFile(); - IOUtils.copy(stream, new FileOutputStream(file)); - } catch (IOException e) { - throw new RuntimeException("Error loading example " + example, e); - } - return file; - } - - private static void printUsage() { - System.out.println("Usage: " + JpmmlRunnerTestTopology.class.getName() - + " [[[-f <PMML model file path>] [-b <Blobstore key used to upload PMML Model>]] " - + "-r <Raw inputs CSV file path>] [topology_name]"); - System.exit(1); - } - - private void run() throws Exception { - System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]", - blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath())); - submitTopologyRemoteCluster(newTopology(), newConfig()); - } - - private StormTopology newTopology() throws Exception { - final TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(RAW_INPUT_FROM_CSV_SPOUT, RawInputFromCSVSpout.newInstance(rawInputs)); - builder.setBolt(PMML_PREDICTOR_BOLT, newBolt()).shuffleGrouping(RAW_INPUT_FROM_CSV_SPOUT); - builder.setBolt(PRINT_BOLT_1, new PrinterBolt()).shuffleGrouping(PMML_PREDICTOR_BOLT); - builder.setBolt(PRINT_BOLT_2, new PrinterBolt()).shuffleGrouping(PMML_PREDICTOR_BOLT, NON_DEFAULT_STREAM_ID); - return builder.createTopology(); - } - - private void submitTopologyRemoteCluster(StormTopology topology, Config config) throws Exception { - StormSubmitter.submitTopology(tplgyName, config, topology); - } - - private Config newConfig() { - Config config = new Config(); - config.setDebug(true); - return config; - } - - private IRichBolt newBolt() throws Exception { - final List<String> streams = Lists.newArrayList(Utils.DEFAULT_STREAM_ID, NON_DEFAULT_STREAM_ID); - if (blobKey != null) { // Load PMML Model from Blob store - final ModelOutputs outFields = JpmmlModelOutputs.toStreams(blobKey, streams); - return new PMMLPredictorBolt(new JpmmlFactory.ModelRunnerFromBlobStore(blobKey, outFields), outFields); - } else { // Load PMML Model from File - final ModelOutputs outFields = JpmmlModelOutputs.toStreams(pmml, streams); - return new PMMLPredictorBolt(new JpmmlFactory.ModelRunnerFromFile(pmml, outFields), outFields); - } - } - - private static class PrinterBolt extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - System.out.println(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer ofd) { - } - } -} diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/RawInputFromCSVSpout.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/RawInputFromCSVSpout.java deleted file mode 100644 index b5e763651..000000000 --- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/RawInputFromCSVSpout.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("checkstyle:AbbreviationAsWordInName") -public class RawInputFromCSVSpout extends BaseRichSpout { - private static final Logger LOG = LoggerFactory.getLogger(RawInputFromCSVSpout.class); - - private File csv; - private List<String> outputFields; - private BufferedReader br; - private SpoutOutputCollector collector; - - public RawInputFromCSVSpout(File rawInputCsv, List<String> outputFields) throws FileNotFoundException { - Objects.requireNonNull(rawInputCsv); - Objects.requireNonNull(outputFields); - - this.csv = rawInputCsv; - this.outputFields = outputFields; - } - - public static RawInputFromCSVSpout newInstance(File csv) throws IOException { - List<String> outputFields; - try (BufferedReader br = newReader(csv)) { - String header = br.readLine(); - LOG.debug("Header: {}", header); - header = header.replaceAll("\"", ""); - LOG.debug("Processed header: {}", header); - final String[] inputNames = header.split(","); - outputFields = Arrays.asList(inputNames); - } - return new RawInputFromCSVSpout(csv, outputFields); - } - - private static BufferedReader newReader(File csv) throws FileNotFoundException { - return new BufferedReader(new InputStreamReader(new FileInputStream(csv))); - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - openReader(); - } - - @Override - public void nextTuple() { - try { - String line = null; - while ((line = br.readLine()) != null) { - collector.emit(Arrays.asList(line.split(","))); - } - } catch (IOException e) { - closeReader(); - e.printStackTrace(); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(outputFields)); - } - - @Override - public void activate() { - } - - @Override - public void deactivate() { - } - - @Override - public void close() { - closeReader(); - } - - // ===== - - private void openReader() { - try { - br = newReader(csv); - br.readLine(); // disregard first line because it has header, already read - } catch (IOException e) { - closeReader(); - throw new RuntimeException(e); - } - } - - private void closeReader() { - if (br != null) { - try { - br.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/examples/storm-pmml-examples/src/main/resources/Audit.50.csv b/examples/storm-pmml-examples/src/main/resources/Audit.50.csv deleted file mode 100644 index bbad02b02..000000000 --- a/examples/storm-pmml-examples/src/main/resources/Audit.50.csv +++ /dev/null @@ -1,51 +0,0 @@ -ID,Age,Employment,Education,Marital,Occupation,Income,Gender,Deductions,Hours,IGNORE_Accounts,RISK_Adjustment,TARGET_Adjusted -1004641,38,"Private","College","Unmarried","Service",81838,"Female",0,72,"UnitedStates",0,0 -1010229,35,"Private","Associate","Absent","Transport",72099,"Male",0,30,"Jamaica",0,0 -1024587,32,"Private","HSgrad","Divorced","Clerical",154676.74,"Male",0,40,"UnitedStates",0,0 -1038288,45,"Private","Bachelor","Married","Repair",27743.82,"Male",0,55,"UnitedStates",7298,1 -1044221,60,"Private","College","Married","Executive",7568.23,"Male",0,40,"UnitedStates",15024,1 -1047095,74,"Private","HSgrad","Married","Service",33144.4,"Male",0,30,"UnitedStates",0,0 -1047698,43,"Private","Bachelor","Married","Executive",43391.17,"Male",0,50,"UnitedStates",22418,1 -1053888,35,"Private","Yr12","Married","Machinist",59906.65,"Male",0,40,"UnitedStates",0,0 -1061323,25,"Private","Associate","Divorced","Clerical",126888.91,"Female",0,40,"UnitedStates",0,0 -1062363,22,"Private","HSgrad","Absent","Sales",52466.49,"Female",0,37,"UnitedStates",0,0 -1068642,48,"Private","College","Divorced","Service",291416.11,"Female",0,35,"UnitedStates",0,0 -1071615,60,"Private","Vocational","Widowed","Clerical",24155.31,"Male",0,40,"UnitedStates",0,0 -1071878,21,"Private","College","Absent","Service",143254.86,"Female",0,35,"UnitedStates",0,0 -1077831,21,"Private","College","Absent","Machinist",120554.81,"Male",0,40,"UnitedStates",0,0 -1084870,50,"Private","Master","Married","Executive",34919.16,"Male",0,40,"Philippines",7298,1 -1085176,37,"Private","HSgrad","Divorced","Executive",67176.79,"Male",0,35,"UnitedStates",0,0 -1093662,30,"Consultant","HSgrad","Divorced","Repair",9608.48,"Male",0,40,"UnitedStates",0,0 -1094029,32,"Private","HSgrad","Married","Machinist",12475.84,"Male",0,40,"UnitedStates",0,0 -1099084,65,"SelfEmp","College","Married","Sales",32963.39,"Male",0,40,"UnitedStates",0,0 -1099829,28,"Private","College","Married","Executive",31534.97,"Male",0,55,"UnitedStates",0,0 -1110947,40,"PSLocal","Vocational","Divorced","Executive",182165.08,"Female",0,40,"UnitedStates",0,0 -1113899,41,"PSState","Bachelor","Divorced","Executive",70603.7,"Male",0,40,"UnitedStates",0,0 -1126025,30,"Private","HSgrad","Absent","Service",88125.97,"Male",0,30,"UnitedStates",0,0 -1129687,38,"Private","HSgrad","Married","Repair",8670.9,"Male",0,40,"UnitedStates",0,0 -1133761,23,"Private","Yr11","Unmarried","Professional",260405.44,"Male",0,35,"UnitedStates",0,0 -1135666,42,"PSState","College","Absent","Executive",66139.36,"Female",0,40,"UnitedStates",0,0 -1139749,26,"Private","Bachelor","Absent","Sales",73751.48,"Female",0,40,"UnitedStates",0,0 -1141887,32,"Consultant","HSgrad","Married","Sales",1428.27,"Male",0,60,"UnitedStates",28235,1 -1142177,49,"PSFederal","College","Married","Support",15345.33,"Male",0,40,"UnitedStates",0,1 -1142367,26,"Private","HSgrad","Married","Repair",48114.39,"Male",0,40,"Mexico",0,0 -1142837,28,"Private","Yr10","Married","Machinist",33493.89,"Male",0,40,"UnitedStates",0,0 -1145025,41,"PSFederal","Bachelor","Married","Support",54653.36,"Male",0,24,"UnitedStates",0,0 -1146358,46,"Private","HSgrad","Absent","Service",229077.27,"Female",0,24,"UnitedStates",0,0 -1147562,42,"Private","College","Absent","Machinist",59201.06,"Female",0,40,"UnitedStates",0,0 -1151685,39,"Private","College","Divorced","Clerical",31036.73,"Female",0,40,"UnitedStates",0,0 -1153241,50,"Private","Yr11","Absent","Machinist",187250.07,"Female",0,40,"UnitedStates",0,0 -1155672,47,"PSLocal","Doctorate","Absent","Professional",161837.75,"Female",0,40,"UnitedStates",0,0 -1157546,24,"Private","Associate","Unmarried","Repair",193135.59,"Male",0,40,"UnitedStates",8614,1 -1158519,45,"Private","Vocational","Married","Repair",26717.49,"Male",0,40,"UnitedStates",0,0 -1169595,40,"PSFederal","Associate","Absent","Clerical",99748.58,"Female",0,40,"UnitedStates",0,0 -1172381,51,"SelfEmp","Doctorate","Married","Professional",13612.07,"Male",0,40,"UnitedStates",15024,1 -1172752,77,"Private","HSgrad","Married","Service",39950.92,"Male",0,25,"Cuba",0,0 -1177328,35,"Private","College","Married","Support",44130.45,"Male",0,45,"UnitedStates",0,0 -1178076,39,"Private","Yr9","Divorced","Cleaner",78516.3,"Male",0,50,"UnitedStates",0,0 -1197102,39,"PSState","Bachelor","Absent","Executive",92268.68,"Female",0,40,"UnitedStates",0,0 -1199127,63,"SelfEmp","Bachelor","Married","Farming",9092.19,"Male",0,40,"UnitedStates",12467,1 -1205484,64,"Private","HSgrad","Widowed","Service",148865.82,"Female",0,38,"UnitedStates",0,0 -1210411,39,"Private","Bachelor","Married","Professional",21190.02,"Male",0,40,"UnitedStates",3611,1 -1211118,66,"Private","Yr5t6","Married-spouse-absent","Cleaner",139087.01,"Female",0,40,"Malaysia",0,0 -1212771,18,"Unemployed","Yr11","Absent",NA,148836.93,"Female",0,10,"UnitedStates",0,0 \ No newline at end of file diff --git a/examples/storm-pmml-examples/src/main/resources/KNIME_PMML_4.1_Examples_single_audit_logreg.xml b/examples/storm-pmml-examples/src/main/resources/KNIME_PMML_4.1_Examples_single_audit_logreg.xml deleted file mode 100644 index 6a90b61af..000000000 --- a/examples/storm-pmml-examples/src/main/resources/KNIME_PMML_4.1_Examples_single_audit_logreg.xml +++ /dev/null @@ -1,259 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<PMML version="4.1" xmlns="http://www.dmg.org/PMML-4_1"> - <Header copyright="KNIME"> - <Application name="KNIME" version="2.8.0"/> - </Header> - <DataDictionary numberOfFields="10"> - <DataField dataType="integer" name="Age" optype="continuous"> - <Interval closure="closedClosed" leftMargin="17.0" rightMargin="90.0"/> - </DataField> - <DataField dataType="string" name="Employment" optype="categorical"> - <Value value="Private"/> - <Value value="Consultant"/> - <Value value="SelfEmp"/> - <Value value="PSLocal"/> - <Value value="PSState"/> - <Value value="PSFederal"/> - <Value value="Unemployed"/> - <Value value="NA"/> - <Value value="Volunteer"/> - </DataField> - <DataField dataType="string" name="Education" optype="categorical"> - <Value value="College"/> - <Value value="Associate"/> - <Value value="HSgrad"/> - <Value value="Bachelor"/> - <Value value="Yr12"/> - <Value value="Vocational"/> - <Value value="Master"/> - <Value value="Yr11"/> - <Value value="Yr10"/> - <Value value="Doctorate"/> - <Value value="Yr9"/> - <Value value="Yr5t6"/> - <Value value="Professional"/> - <Value value="Yr7t8"/> - <Value value="Preschool"/> - <Value value="Yr1t4"/> - </DataField> - <DataField dataType="string" name="Marital" optype="categorical"> - <Value value="Unmarried"/> - <Value value="Absent"/> - <Value value="Divorced"/> - <Value value="Married"/> - <Value value="Widowed"/> - <Value value="Married-spouse-absent"/> - </DataField> - <DataField dataType="string" name="Occupation" optype="categorical"> - <Value value="Service"/> - <Value value="Transport"/> - <Value value="Clerical"/> - <Value value="Repair"/> - <Value value="Executive"/> - <Value value="Machinist"/> - <Value value="Sales"/> - <Value value="Professional"/> - <Value value="Support"/> - <Value value="Cleaner"/> - <Value value="Farming"/> - <Value value="NA"/> - <Value value="Protective"/> - <Value value="Home"/> - <Value value="Military"/> - </DataField> - <DataField dataType="double" name="Income" optype="continuous"> - <Interval closure="closedClosed" leftMargin="609.72" rightMargin="481259.5"/> - </DataField> - <DataField dataType="string" name="Gender" optype="categorical"> - <Value value="Female"/> - <Value value="Male"/> - </DataField> - <DataField dataType="double" name="Deductions" optype="continuous"> - <Interval closure="closedClosed" leftMargin="0.0" rightMargin="2904.0"/> - </DataField> - <DataField dataType="integer" name="Hours" optype="continuous"> - <Interval closure="closedClosed" leftMargin="1.0" rightMargin="99.0"/> - </DataField> - <DataField dataType="string" name="TARGET_Adjusted" optype="categorical"> - <Value value="0"/> - <Value value="1"/> - </DataField> - </DataDictionary> - <GeneralRegressionModel modelType="multinomialLogistic" functionName="classification" algorithmName="LogisticRegression" - modelName="KNIME Logistic Regression"> - <MiningSchema> - <MiningField name="Age" invalidValueTreatment="asIs"/> - <MiningField name="Employment" invalidValueTreatment="asIs"/> - <MiningField name="Education" invalidValueTreatment="asIs"/> - <MiningField name="Marital" invalidValueTreatment="asIs"/> - <MiningField name="Occupation" invalidValueTreatment="asIs"/> - <MiningField name="Income" invalidValueTreatment="asIs"/> - <MiningField name="Gender" invalidValueTreatment="asIs"/> - <MiningField name="Deductions" invalidValueTreatment="asIs"/> - <MiningField name="Hours" invalidValueTreatment="asIs"/> - <MiningField name="TARGET_Adjusted" invalidValueTreatment="asIs" usageType="predicted"/> - </MiningSchema> - <ParameterList> - <Parameter name="p0" label="Intercept"/> - <Parameter name="p1" label="Age"/> - <Parameter name="p2" label="[Employment=NA]"/> - <Parameter name="p3" label="[Employment=PSFederal]"/> - <Parameter name="p4" label="[Employment=PSLocal]"/> - <Parameter name="p5" label="[Employment=PSState]"/> - <Parameter name="p6" label="[Employment=Private]"/> - <Parameter name="p7" label="[Employment=SelfEmp]"/> - <Parameter name="p8" label="[Employment=Unemployed]"/> - <Parameter name="p9" label="[Employment=Volunteer]"/> - <Parameter name="p10" label="[Education=Bachelor]"/> - <Parameter name="p11" label="[Education=College]"/> - <Parameter name="p12" label="[Education=Doctorate]"/> - <Parameter name="p13" label="[Education=HSgrad]"/> - <Parameter name="p14" label="[Education=Master]"/> - <Parameter name="p15" label="[Education=Preschool]"/> - <Parameter name="p16" label="[Education=Professional]"/> - <Parameter name="p17" label="[Education=Vocational]"/> - <Parameter name="p18" label="[Education=Yr10]"/> - <Parameter name="p19" label="[Education=Yr11]"/> - <Parameter name="p20" label="[Education=Yr12]"/> - <Parameter name="p21" label="[Education=Yr1t4]"/> - <Parameter name="p22" label="[Education=Yr5t6]"/> - <Parameter name="p23" label="[Education=Yr7t8]"/> - <Parameter name="p24" label="[Education=Yr9]"/> - <Parameter name="p25" label="[Marital=Divorced]"/> - <Parameter name="p26" label="[Marital=Married]"/> - <Parameter name="p27" label="[Marital=Married-spouse-absent]"/> - <Parameter name="p28" label="[Marital=Unmarried]"/> - <Parameter name="p29" label="[Marital=Widowed]"/> - <Parameter name="p30" label="[Occupation=Clerical]"/> - <Parameter name="p31" label="[Occupation=Executive]"/> - <Parameter name="p32" label="[Occupation=Farming]"/> - <Parameter name="p33" label="[Occupation=Home]"/> - <Parameter name="p34" label="[Occupation=Machinist]"/> - <Parameter name="p35" label="[Occupation=Military]"/> - <Parameter name="p36" label="[Occupation=NA]"/> - <Parameter name="p37" label="[Occupation=Professional]"/> - <Parameter name="p38" label="[Occupation=Protective]"/> - <Parameter name="p39" label="[Occupation=Repair]"/> - <Parameter name="p40" label="[Occupation=Sales]"/> - <Parameter name="p41" label="[Occupation=Service]"/> - <Parameter name="p42" label="[Occupation=Support]"/> - <Parameter name="p43" label="[Occupation=Transport]"/> - <Parameter name="p44" label="Income"/> - <Parameter name="p45" label="[Gender=Male]"/> - <Parameter name="p46" label="Deductions"/> - <Parameter name="p47" label="Hours"/> - </ParameterList> - <FactorList> - <Predictor name="Employment"/> - <Predictor name="Education"/> - <Predictor name="Marital"/> - <Predictor name="Occupation"/> - <Predictor name="Gender"/> - </FactorList> - <CovariateList> - <Predictor name="Age"/> - <Predictor name="Income"/> - <Predictor name="Deductions"/> - <Predictor name="Hours"/> - </CovariateList> - <PPMatrix> - <PPCell value="1" predictorName="Age" parameterName="p1"/> - <PPCell value="NA" predictorName="Employment" parameterName="p2"/> - <PPCell value="PSFederal" predictorName="Employment" parameterName="p3"/> - <PPCell value="PSLocal" predictorName="Employment" parameterName="p4"/> - <PPCell value="PSState" predictorName="Employment" parameterName="p5"/> - <PPCell value="Private" predictorName="Employment" parameterName="p6"/> - <PPCell value="SelfEmp" predictorName="Employment" parameterName="p7"/> - <PPCell value="Unemployed" predictorName="Employment" parameterName="p8"/> - <PPCell value="Volunteer" predictorName="Employment" parameterName="p9"/> - <PPCell value="Bachelor" predictorName="Education" parameterName="p10"/> - <PPCell value="College" predictorName="Education" parameterName="p11"/> - <PPCell value="Doctorate" predictorName="Education" parameterName="p12"/> - <PPCell value="HSgrad" predictorName="Education" parameterName="p13"/> - <PPCell value="Master" predictorName="Education" parameterName="p14"/> - <PPCell value="Preschool" predictorName="Education" parameterName="p15"/> - <PPCell value="Professional" predictorName="Education" parameterName="p16"/> - <PPCell value="Vocational" predictorName="Education" parameterName="p17"/> - <PPCell value="Yr10" predictorName="Education" parameterName="p18"/> - <PPCell value="Yr11" predictorName="Education" parameterName="p19"/> - <PPCell value="Yr12" predictorName="Education" parameterName="p20"/> - <PPCell value="Yr1t4" predictorName="Education" parameterName="p21"/> - <PPCell value="Yr5t6" predictorName="Education" parameterName="p22"/> - <PPCell value="Yr7t8" predictorName="Education" parameterName="p23"/> - <PPCell value="Yr9" predictorName="Education" parameterName="p24"/> - <PPCell value="Divorced" predictorName="Marital" parameterName="p25"/> - <PPCell value="Married" predictorName="Marital" parameterName="p26"/> - <PPCell value="Married-spouse-absent" predictorName="Marital" parameterName="p27"/> - <PPCell value="Unmarried" predictorName="Marital" parameterName="p28"/> - <PPCell value="Widowed" predictorName="Marital" parameterName="p29"/> - <PPCell value="Clerical" predictorName="Occupation" parameterName="p30"/> - <PPCell value="Executive" predictorName="Occupation" parameterName="p31"/> - <PPCell value="Farming" predictorName="Occupation" parameterName="p32"/> - <PPCell value="Home" predictorName="Occupation" parameterName="p33"/> - <PPCell value="Machinist" predictorName="Occupation" parameterName="p34"/> - <PPCell value="Military" predictorName="Occupation" parameterName="p35"/> - <PPCell value="NA" predictorName="Occupation" parameterName="p36"/> - <PPCell value="Professional" predictorName="Occupation" parameterName="p37"/> - <PPCell value="Protective" predictorName="Occupation" parameterName="p38"/> - <PPCell value="Repair" predictorName="Occupation" parameterName="p39"/> - <PPCell value="Sales" predictorName="Occupation" parameterName="p40"/> - <PPCell value="Service" predictorName="Occupation" parameterName="p41"/> - <PPCell value="Support" predictorName="Occupation" parameterName="p42"/> - <PPCell value="Transport" predictorName="Occupation" parameterName="p43"/> - <PPCell value="1" predictorName="Income" parameterName="p44"/> - <PPCell value="Male" predictorName="Gender" parameterName="p45"/> - <PPCell value="1" predictorName="Deductions" parameterName="p46"/> - <PPCell value="1" predictorName="Hours" parameterName="p47"/> - </PPMatrix> - <ParamMatrix> - <PCell targetCategory="0" parameterName="p0" beta="6.590813672854236" df="1"/> - <PCell targetCategory="0" parameterName="p1" beta="-0.029869599127058277" df="1"/> - <PCell targetCategory="0" parameterName="p2" beta="40.4456937237571" df="1"/> - <PCell targetCategory="0" parameterName="p3" beta="-0.2899581729437417" df="1"/> - <PCell targetCategory="0" parameterName="p4" beta="-0.0984223511273083" df="1"/> - <PCell targetCategory="0" parameterName="p5" beta="-0.3005414372546424" df="1"/> - <PCell targetCategory="0" parameterName="p6" beta="-0.33869790096570024" df="1"/> - <PCell targetCategory="0" parameterName="p7" beta="-0.13877470555494242" df="1"/> - <PCell targetCategory="0" parameterName="p8" beta="45.419663519813206" df="1"/> - <PCell targetCategory="0" parameterName="p9" beta="11.217152193009843" df="1"/> - <PCell targetCategory="0" parameterName="p10" beta="-0.09886743198111589" df="1"/> - <PCell targetCategory="0" parameterName="p11" beta="0.8552318245772677" df="1"/> - <PCell targetCategory="0" parameterName="p12" beta="-1.0114417270619689" df="1"/> - <PCell targetCategory="0" parameterName="p13" beta="1.1549450209966883" df="1"/> - <PCell targetCategory="0" parameterName="p14" beta="-0.4820048758605844" df="1"/> - <PCell targetCategory="0" parameterName="p15" beta="9.303779247915049" df="1"/> - <PCell targetCategory="0" parameterName="p16" beta="-1.7325688523048413" df="1"/> - <PCell targetCategory="0" parameterName="p17" beta="0.9832530229680194" df="1"/> - <PCell targetCategory="0" parameterName="p18" beta="1.5463320662301587" df="1"/> - <PCell targetCategory="0" parameterName="p19" beta="1.6012097402313625" df="1"/> - <PCell targetCategory="0" parameterName="p20" beta="1.7386040769196414" df="1"/> - <PCell targetCategory="0" parameterName="p21" beta="10.743967513050661" df="1"/> - <PCell targetCategory="0" parameterName="p22" beta="2.224165886749301" df="1"/> - <PCell targetCategory="0" parameterName="p23" beta="10.351794009168092" df="1"/> - <PCell targetCategory="0" parameterName="p24" beta="2.930377281785839" df="1"/> - <PCell targetCategory="0" parameterName="p25" beta="0.06347405092149999" df="1"/> - <PCell targetCategory="0" parameterName="p26" beta="-2.6814260841607522" df="1"/> - <PCell targetCategory="0" parameterName="p27" beta="-0.3562056962831216" df="1"/> - <PCell targetCategory="0" parameterName="p28" beta="-0.5921257059634838" df="1"/> - <PCell targetCategory="0" parameterName="p29" beta="0.13396307591544007" df="1"/> - <PCell targetCategory="0" parameterName="p30" beta="-1.180531651304198" df="1"/> - <PCell targetCategory="0" parameterName="p31" beta="-1.587006162425804" df="1"/> - <PCell targetCategory="0" parameterName="p32" beta="-0.024950851362548977" df="1"/> - <PCell targetCategory="0" parameterName="p33" beta="6.1440262648569774" df="1"/> - <PCell targetCategory="0" parameterName="p34" beta="-0.4817767619556052" df="1"/> - <PCell targetCategory="0" parameterName="p35" beta="6.563012726032708" df="1"/> - <PCell targetCategory="0" parameterName="p36" beta="-41.166666666666664" df="1"/> - <PCell targetCategory="0" parameterName="p37" beta="-1.233279040328492" df="1"/> - <PCell targetCategory="0" parameterName="p38" beta="-1.8657296177571157" df="1"/> - <PCell targetCategory="0" parameterName="p39" beta="-0.6785629372964469" df="1"/> - <PCell targetCategory="0" parameterName="p40" beta="-0.9624742082723919" df="1"/> - <PCell targetCategory="0" parameterName="p41" beta="0.3746732371075393" df="1"/> - <PCell targetCategory="0" parameterName="p42" beta="-1.278738658416571" df="1"/> - <PCell targetCategory="0" parameterName="p43" beta="-0.24723060056530316" df="1"/> - <PCell targetCategory="0" parameterName="p44" beta="-2.405253765147117E-6" df="1"/> - <PCell targetCategory="0" parameterName="p45" beta="-0.19105976220912327" df="1"/> - <PCell targetCategory="0" parameterName="p46" beta="-0.0010528530766729122" df="1"/> - <PCell targetCategory="0" parameterName="p47" beta="-0.03465036938390506" df="1"/> - </ParamMatrix> - </GeneralRegressionModel> -</PMML> \ No newline at end of file diff --git a/examples/storm-pmml-examples/src/main/resources/README.md b/examples/storm-pmml-examples/src/main/resources/README.md deleted file mode 100644 index 8b2d3f25a..000000000 --- a/examples/storm-pmml-examples/src/main/resources/README.md +++ /dev/null @@ -1,6 +0,0 @@ -The PMML model file, as well as the sample raw input data file are from the `4.1 Regression KNIME KNIME 2.8` -[example] (http://dmg.org/pmml/pmml_examples/index.html) - - The PMML Model file matches the [example file] (http://dmg.org/pmml/pmml_examples/KNIME_PMML_4.1_Examples/single_audit_mlp.xml) - - The file Audit.50.csv contains the first 50 lines of raw input data of the [Audit example] (http://dmg.org/pmml/pmml_examples/index.html#Audit). \ No newline at end of file diff --git a/external/storm-pmml/README.md b/external/storm-pmml/README.md deleted file mode 100644 index d237f5564..000000000 --- a/external/storm-pmml/README.md +++ /dev/null @@ -1,61 +0,0 @@ -# Storm PMML Bolt - Storm integration to load PMML models and compute predictive scores for running tuples. The PMML model represents - the machine learning (predictive) model used to do prediction on raw input data. The model is typically loaded into a - runtime environment, which will score the raw data that comes in the tuples. - -## Create Instance of PMML Bolt - To create an instance of the `PMMLPredictorBolt`, you must provide the `ModelOutputs`, and a `ModelRunner` using a - `ModelRunnerFactory`. The `ModelOutputs` represents the streams and output fields declared by the `PMMLPredictorBolt`. - The `ModelRunner` represents the runtime environment to execute the predictive scoring. It has only one method: - - ```java - Map<String, List<Object>> scoredTuplePerStream(Tuple input); - ``` - - This method contains the logic to compute the scored tuples from the raw inputs tuple. It's up to the discretion of the - implementation to define which scored values are to be assigned to each stream. The keys of this map are the stream ids, - and the values the predicted scores. - - The `PmmlModelRunner` is an extension of `ModelRunner` that represents the typical steps involved - in predictive scoring. Hence, it allows for the **extraction** of raw inputs from the tuple, **pre process** the - raw inputs, and **predict** the scores from the preprocessed data. - - The `JPmmlModelRunner` is an implementation of `PmmlModelRunner` that uses [JPMML](https://github.com/jpmml/jpmml) as - runtime environment. This implementation extracts the raw inputs from the tuple for all `active fields`, - and builds a tuple with the predicted scores for the `predicted fields` and `output fields`. - In this implementation all the declared streams will have the same scored tuple. - - The `predicted`, `active`, and `output` fields are extracted from the PMML model. - -## Run Bundled Examples - -To run the examples you must execute the following command: - - ```java - STORM-HOME/bin/storm jar STORM-HOME/examples/storm-pmml-examples/storm-pmml-examples-2.0.0-SNAPSHOT.jar - org.apache.storm.pmml.JpmmlRunnerTestTopology jpmmlTopology PMMLModel.xml RawInputData.csv - ``` - -## License - -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - - -## Committer Sponsors - * Sriharsha Chintalapani ([[email protected]](mailto:[email protected])) - * P. Taylor Goetz ([[email protected]](mailto:[email protected])) diff --git a/external/storm-pmml/pom.xml b/external/storm-pmml/pom.xml deleted file mode 100644 index 17cca0591..000000000 --- a/external/storm-pmml/pom.xml +++ /dev/null @@ -1,103 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>storm-pmml</artifactId> - <name>storm-pmml</name> - - <packaging>jar</packaging> - - <properties> - <!-- Required downgrade by pmml-evaluator 1.0.22 --> - <guava.version>16.0.1</guava.version> - </properties> - - <developers> - <developer> - <id>hmcl</id> - <name>Hugo Louro</name> - <email>[email protected]</email> - </developer> - </developers> - - <dependencies> - <!--parent module dependency--> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - - <!-- JPMML Compile Time dependencies LICENSING WARNING! - For Licensing compliance the artifactIds cannot be changed from pmml to jpmml. - Notice the 'j' prefix. Artifacts prefixed with 'j' should not be used. - --> - <!-- Class model classes --> - <dependency> - <groupId>org.jpmml</groupId> - <artifactId>pmml-model</artifactId> - <version>${jpmml.version}</version> - </dependency> - <!-- Class model annotations --> - <dependency> - <groupId>org.jpmml</groupId> - <artifactId>pmml-schema</artifactId> - <version>${jpmml.version}</version> - </dependency> - <dependency> - <groupId>org.jpmml</groupId> - <artifactId>pmml-evaluator</artifactId> - <version>${jpmml.version}</version> - </dependency> - <!-- JAXB api --> - <dependency> - <groupId>javax.xml.bind</groupId> - <artifactId>jaxb-api</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java deleted file mode 100644 index 0d9bc4cf0..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml; - -import java.util.List; -import java.util.Map; - -import org.apache.storm.pmml.model.ModelOutputs; -import org.apache.storm.pmml.runner.ModelRunner; -import org.apache.storm.pmml.runner.ModelRunnerFactory; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("checkstyle:AbbreviationAsWordInName") -public class PMMLPredictorBolt extends BaseTickTupleAwareRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class); - - private final ModelOutputs outputs; - private final ModelRunnerFactory runnerFactory; - private ModelRunner runner; - private OutputCollector collector; - - /* - * Passing a factory rather than the actual object to avoid enforcing the strong - * requirement of having to have ModelRunner to be Serializable - */ - - /** - * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with - * the {@link ModelRunnerFactory} specified in the parameter - * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified - * by the {@link ModelOutputs} parameter. - */ - public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputs modelOutputs) { - this.outputs = modelOutputs; - this.runnerFactory = modelRunnerFactory; - LOG.info("Instantiated {}", this); - } - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { - this.runner = runnerFactory.newModelRunner(); - this.collector = collector; - } - - @Override - protected void process(Tuple input) { - try { - final Map<String, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input); - LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream); - if (scoresPerStream != null) { - for (Map.Entry<String, List<Object>> streamToTuple : scoresPerStream.entrySet()) { - collector.emit(streamToTuple.getKey(), input, streamToTuple.getValue()); - } - collector.ack(input); - } else { - LOG.debug("Input tuple [{}] generated NULL scores", input); - } - } catch (Exception e) { - collector.reportError(e); - collector.fail(input); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - LOG.info("Declaring output fields [{}]", outputs); - for (Map.Entry<String, ? extends Fields> streamToFields : outputs.streamFields().entrySet()) { - declarer.declareStream(streamToFields.getKey(), streamToFields.getValue()); - } - } - - @Override - public String toString() { - return "PMMLPredictorBolt{" - + "outputFields=" + outputs - + ", runnerFactory=" + runnerFactory.getClass().getName() - + ", runner=" + runner - + ", collector=" + collector - + "} "; - } -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java deleted file mode 100644 index 8322a8a86..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.model; - -import java.io.Serializable; -import java.util.Map; -import java.util.Set; - -import org.apache.storm.pmml.PMMLPredictorBolt; -import org.apache.storm.tuple.Fields; - -/** - * Represents the streams and output fields declared by the {@link PMMLPredictorBolt}. - */ -public interface ModelOutputs extends Serializable { - - /** - * Stream fields. - * @return a map with the output fields declared for each stream by the {@link PMMLPredictorBolt} - */ - Map<String, ? extends Fields> streamFields(); - - /** - * Convenience method that returns a set with all the streams declared by the {@link PMMLPredictorBolt}. - * By default this this method calls {@link #streamFields()}{@code .keySet()}. - * @return The set with all declared streams - */ - default Set<String> streams() { - return streamFields().keySet(); - } -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/jpmml/JpmmlModelOutputs.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/jpmml/JpmmlModelOutputs.java deleted file mode 100644 index 69cf3ec5b..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/jpmml/JpmmlModelOutputs.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.model.jpmml; - -import java.io.File; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.apache.storm.pmml.model.ModelOutputs; -import org.apache.storm.pmml.runner.jpmml.JpmmlFactory; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.dmg.pmml.FieldName; -import org.dmg.pmml.PMML; -import org.jpmml.evaluator.Evaluator; - -public class JpmmlModelOutputs implements ModelOutputs { - private final Map<String, ? extends Fields> declared; - - public JpmmlModelOutputs(Map<String, ? extends Fields> declaredFields) { - this.declared = declaredFields; - } - - @Override - public Map<String, ? extends Fields> streamFields() { - return declared; - } - - @Override - public String toString() { - return "JpmmlModelOutputs{" + declared + '}'; - } - - // ================= Factory Methods Declaring ModelOutputs to Default Stream ================== - - /** - * Factory method that creates an instance of {@link ModelOutputs} that declares - * the {@code predicted} and {@code output} fields specified in the {@link PMML} model - * specified as argument into the {@code default} stream. - */ - public static ModelOutputs toDefaultStream(PMML pmmlModel) { - Objects.requireNonNull(pmmlModel); - return create(pmmlModel, Collections.singletonList(Utils.DEFAULT_STREAM_ID)); - } - - public static ModelOutputs toDefaultStream(File pmmlModel) { - try { - return toDefaultStream(JpmmlFactory.newPmml(pmmlModel)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static ModelOutputs toDefaultStream(InputStream pmmlModel) { - try { - return toDefaultStream(JpmmlFactory.newPmml(pmmlModel)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static ModelOutputs toDefaultStream(String blobKey) { - return toDefaultStream(blobKey, Utils.readStormConfig()); - } - - public static ModelOutputs toDefaultStream(String blobKey, Map<String, Object> config) { - try { - return toDefaultStream(JpmmlFactory.newPmml(blobKey, config)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // ================= Factory Methods Declaring ModelOutputs to Multiple Streams ================== - - /** - * Factory method that creates an instance of {@link ModelOutputs} that declares - * the {@code predicted} and {@code output} fields specified in the {@link PMML} model - * specified as argument into the list of streams specified. - */ - public static ModelOutputs toStreams(PMML pmmlModel, List<String> streams) { - return create(pmmlModel, streams); - } - - public static ModelOutputs toStreams(File pmmlModel, List<String> streams) { - try { - return toStreams(JpmmlFactory.newPmml(pmmlModel), streams); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static ModelOutputs toStreams(InputStream pmmlModel, List<String> streams) { - try { - return toStreams(JpmmlFactory.newPmml(pmmlModel), streams); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static ModelOutputs toStreams(String blobKey, List<String> streams) { - return toStreams(blobKey, Utils.readStormConfig(), streams); - } - - public static ModelOutputs toStreams(String blobKey, Map<String, Object> config, List<String> streams) { - try { - return toStreams(JpmmlFactory.newPmml(blobKey, config), streams); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // ====== - - private static ModelOutputs create(PMML pmmlModel, List<String> streams) { - final Set<String> fieldNames = new LinkedHashSet<>(); - final Evaluator evaluator = JpmmlFactory.newEvaluator(pmmlModel); - - for (FieldName predictedField : evaluator.getPredictedFields()) { - fieldNames.add(predictedField.getValue()); - } - - for (FieldName outputField : evaluator.getOutputFields()) { - fieldNames.add(outputField.getValue()); - } - - final Map<String, Fields> toDeclare = streams.stream() - .collect(Collectors.toMap(Function.identity(), (x) -> new Fields(new ArrayList<>(fieldNames)))); - - return new JpmmlModelOutputs(toDeclare); - } -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunner.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunner.java deleted file mode 100644 index ef1e74145..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunner.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.runner; - -import java.util.List; -import java.util.Map; - -import org.apache.storm.pmml.model.ModelOutputs; -import org.apache.storm.tuple.Tuple; - -public interface ModelRunner { - - /** - * Creates and returns a map with the predicted scores that are to be emitted on each stream. - * The keys of this map are the stream ids, and the values the predicted scores. - * It's up to the implementation to guarantee that the streams ids match the stream ids defined in - * {@link ModelOutputs}. Namely, the set of keys of the {@code Map<String, List<Object>>} returned - * by this method should be a subset of {@link ModelOutputs#streams()} - * - * @return The map with the predicted scores that are to be emitted on each stream - */ - Map<String, List<Object>> scoredTuplePerStream(Tuple input); -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunnerFactory.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunnerFactory.java deleted file mode 100644 index 9e08b4e5d..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunnerFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.runner; - -import java.io.Serializable; - -public interface ModelRunnerFactory extends Serializable { - ModelRunner newModelRunner(); -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/PmmlModelRunner.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/PmmlModelRunner.java deleted file mode 100644 index 7d2ece8b8..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/PmmlModelRunner.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.runner; - -import org.apache.storm.tuple.Tuple; - -/** - * Runner for models defined using PMML. - * - * @param <I> type of the input source. For Storm it is typically a {@link Tuple} - * @param <R> the type of extracted raw input - * @param <P> the type of preprocessed input - * @param <S> the type of predicted scores - */ -public interface PmmlModelRunner<I, R, P, S> extends ModelRunner { - - /** - * Extracts from the tuple the raw inputs that are to be scored according to the predictive model. - * - * @param input source from which to extract raw inputs - * @return raw inputs - */ - R extractRawInputs(I input); - - /** - * Pre process inputs, i.e., remove missing fields, outliers, etc - * - * @param rawInputs that are to be preprocessed - * @return preprocessed output - */ - P preProcessInputs(R rawInputs); - - /** - * Compute the predicted scores from the pre-processed inputs in the step above. - * - * @param preProcInputs that are to be preprocessed - * @return predicted scores - */ - S predictScores(P preProcInputs); -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JPmmlModelRunner.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JPmmlModelRunner.java deleted file mode 100644 index ecdbbb4ae..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JPmmlModelRunner.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.runner.jpmml; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import org.apache.storm.pmml.model.ModelOutputs; -import org.apache.storm.pmml.runner.PmmlModelRunner; -import org.apache.storm.tuple.Tuple; -import org.dmg.pmml.FieldName; -import org.jpmml.evaluator.Evaluator; -import org.jpmml.evaluator.EvaluatorUtil; -import org.jpmml.evaluator.FieldValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JPMML implementation of {@link PmmlModelRunner}. It extracts the raw inputs from the tuple for all - * 'active fields', and builds a tuple with the predicted scores for the 'predicted fields' and 'output fields'. - * In this implementation all the declared streams will have the same scored tuple. - * - * <p>The 'predicted', 'active', and 'output' fields are extracted from the PMML model. - */ -public class JPmmlModelRunner implements PmmlModelRunner<Tuple, - Map<FieldName, Object>, - Map<FieldName, FieldValue>, - Map<FieldName, ?>> { - - private static final Logger LOG = LoggerFactory.getLogger(JPmmlModelRunner.class); - - private final Evaluator eval; // Jpmml evaluator - private final List<FieldName> activeFields; - private final List<FieldName> predictedFields; - private final List<FieldName> outputFields; - private final ModelOutputs modelOutputs; - - public JPmmlModelRunner(Evaluator evaluator, ModelOutputs modelOutputs) { - this.eval = evaluator; - this.modelOutputs = modelOutputs; - activeFields = evaluator.getActiveFields(); - predictedFields = eval.getPredictedFields(); - outputFields = eval.getOutputFields(); - } - - /** - * Extract raw inputs. - * @return The raw inputs extracted from the tuple for all 'active fields' - */ - @Override - public Map<FieldName, Object> extractRawInputs(Tuple tuple) { - LOG.debug("Extracting raw inputs from tuple: = [{}]", tuple); - final Map<FieldName, Object> rawInputs = new LinkedHashMap<>(); - for (FieldName activeField : activeFields) { - rawInputs.put(activeField, tuple.getValueByField(activeField.getValue())); - } - LOG.debug("Raw inputs = [{}]", rawInputs); - return rawInputs; - } - - @Override - public Map<FieldName, FieldValue> preProcessInputs(Map<FieldName, Object> rawInputs) { - LOG.debug("Pre processing raw inputs: = [{}]", rawInputs); - final Map<FieldName, FieldValue> preProcInputs = new LinkedHashMap<>(); - for (Map.Entry<FieldName, Object> rawEntry : rawInputs.entrySet()) { - preProcInputs.putIfAbsent(rawEntry.getKey(), EvaluatorUtil.prepare(eval, rawEntry.getKey(), rawEntry.getValue())); - } - LOG.debug("Pre processed inputs = [{}]", preProcInputs); - return preProcInputs; - } - - @Override - public Map<FieldName, ?> predictScores(Map<FieldName, FieldValue> preProcInputs) { - LOG.debug("Predicting scores for pre processed inputs: = [{}]", preProcInputs); - Map<FieldName, ?> predictedScores = eval.evaluate(preProcInputs); - LOG.debug("Predicted scores = [{}]", predictedScores); - return predictedScores; - } - - /** - * Retrieve scores. - * @return the predicted scores for the 'predicted fields' and 'output fields'. - * All the declared streams will have the same scored tuple. - */ - @Override - public Map<String, List<Object>> scoredTuplePerStream(Tuple input) { - final Map<FieldName, Object> rawInputs = extractRawInputs(input); - final Map<FieldName, FieldValue> preProcInputs = preProcessInputs(rawInputs); - final Map<FieldName, ?> predScores = predictScores(preProcInputs); - - return toValuesMap(predScores); - } - - // Sends the same tuple (list of scored/predicted values) to all the declared streams - private Map<String, List<Object>> toValuesMap(Map<FieldName, ?> predScores) { - final List<Object> scoredVals = new ArrayList<>(); - - for (FieldName predictedField : predictedFields) { - Object targetValue = predScores.get(predictedField); - scoredVals.add(EvaluatorUtil.decode(targetValue)); - } - - for (FieldName outputField : outputFields) { - Object targetValue = predScores.get(outputField); - scoredVals.add(EvaluatorUtil.decode(targetValue)); - } - - final Map<String, List<Object>> valuesMap = new HashMap<>(); - - for (String stream: modelOutputs.streams()) { - valuesMap.put(stream, scoredVals); - } - - return valuesMap; - } - - public Evaluator getEval() { - return eval; - } - - public List<FieldName> getActiveFields() { - return Collections.unmodifiableList(activeFields); - } - - public List<FieldName> getPredictedFields() { - return Collections.unmodifiableList(predictedFields); - } - - public List<FieldName> getOutputFields() { - return Collections.unmodifiableList(outputFields); - } - - public ModelOutputs getModelOutputs() { - return modelOutputs; - } -} diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java deleted file mode 100644 index c0cca3a5b..000000000 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.pmml.runner.jpmml; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.Objects; - -import javax.xml.bind.JAXBException; - -import org.apache.storm.pmml.model.ModelOutputs; -import org.apache.storm.pmml.runner.ModelRunner; -import org.apache.storm.pmml.runner.ModelRunnerFactory; -import org.apache.storm.utils.Utils; -import org.dmg.pmml.IOUtil; -import org.dmg.pmml.PMML; -import org.jpmml.evaluator.Evaluator; -import org.jpmml.evaluator.ModelEvaluator; -import org.jpmml.evaluator.ModelEvaluatorFactory; -import org.jpmml.manager.PMMLManager; -import org.xml.sax.SAXException; - -/* - * This class consists exclusively of static factory methods that create instances that are essential to work with the - * Jpmml library. - */ -public class JpmmlFactory { - - /** - * Creates a new {@link PMML} object representing the PMML model defined in the XML {@link File} specified as argument. - */ - public static PMML newPmml(File file) throws JAXBException, SAXException, IOException { - Objects.requireNonNull(file); - return IOUtil.unmarshal(file); - } - - /** - * Creates a new {@link PMML} object representing the PMML model defined in the {@link InputStream} specified as argument. - */ - public static PMML newPmml(InputStream stream) throws JAXBException, SAXException, IOException { - Objects.requireNonNull(stream); - return IOUtil.unmarshal(stream); - } - - /** - * Creates a new {@link PMML} object representing the PMML model uploaded to the Blobstore with key specified as - * argument. Uses Storm config as returned by {@code Utils.readStormConfig()} to get the Blobstore client. - */ - public static PMML newPmml(String blobKey) throws JAXBException, SAXException, IOException { - return newPmml(blobKey, Utils.readStormConfig()); - } - - /** - * Creates a new {@link PMML} object representing the PMML model uploaded to the Blobstore with key specified as - * argument. Uses the specified configuration to get the Blobstore client. - */ - public static PMML newPmml(String blobKey, Map<String, Object> config) throws JAXBException, SAXException, IOException { - Objects.requireNonNull(blobKey); - Objects.requireNonNull(config); - return newPmml(getPmmlModelBlob(blobKey, config)); - } - - // ================== Get PMML Model from Blobstore ================== - - /** - * Returns PMML model from Blobstore. Uses Storm config as returned by {@code Utils.readStormConfig()}. - * @param blobKey key of PMML model in Blobstore - */ - public static InputStream getPmmlModelBlob(String blobKey) { - return getPmmlModelBlob(blobKey, Utils.readStormConfig()); - } - - /** - * Returns PMML model from Blobstore. - * @param blobKey key of PMML model in Blobstore - * @param config Configuration to use to get Blobstore client - */ - public static InputStream getPmmlModelBlob(String blobKey, Map<String, Object> config) { - Objects.requireNonNull(blobKey); - Objects.requireNonNull(config); - try { - return Utils.getClientBlobStore(config).getBlob(blobKey); - } catch (Exception e) { - throw new RuntimeException("Failed to download PMML Model from Blobstore using blob key [" - + blobKey - + "]", - e); - } - } - - // ================== Evaluator ================== - - /** - * Creates a new {@link Evaluator} object representing the PMML model defined in the {@link PMML} argument. - */ - public static Evaluator newEvaluator(PMML pmml) { - Objects.requireNonNull(pmml); - final PMMLManager pmmlManager = new PMMLManager(pmml); - return (ModelEvaluator<?>) pmmlManager.getModelManager(null, ModelEvaluatorFactory.getInstance()); - } - - /** - * Creates a new {@link Evaluator} object representing the PMML model defined in the XML {@link File} specified as - * argument. - */ - public static Evaluator newEvaluator(File file) throws IOException, JAXBException, SAXException { - Objects.requireNonNull(file); - return newEvaluator(newPmml(file)); - } - - /** - * Creates a new {@link Evaluator} object representing the PMML model defined in the XML {@link File} specified as - * argument. - */ - public static Evaluator newEvaluator(InputStream stream) throws IOException, JAXBException, SAXException { - Objects.requireNonNull(stream); - return newEvaluator(newPmml(stream)); - } - - /** - * Creates a new {@link Evaluator} object representing the PMML model uploaded to the Blobstore using the blob key - * specified as argument. Uses Storm config as returned by {@code Utils.readStormConfig()} to get the Blobstore - * client. - */ - public static Evaluator newEvaluator(String blobKey) throws IOException, JAXBException, SAXException { - Objects.requireNonNull(blobKey); - return newEvaluator(blobKey, Utils.readStormConfig()); - } - - /** - * Creates a new {@link Evaluator} object representing the PMML model uploaded to the Blobstore using the blob key - * specified as argument. Uses the specified configuration to get the Blobstore client. - */ - public static Evaluator newEvaluator(String blobKey, Map<String, Object> config) throws IOException, JAXBException, SAXException { - Objects.requireNonNull(blobKey); - Objects.requireNonNull(config); - return newEvaluator(newPmml(blobKey, config)); - } - - // ============ Factories ============ - - public static class ModelRunnerFromFile implements ModelRunnerFactory { - private final File model; - private final ModelOutputs outFields; - - public ModelRunnerFromFile(File model, ModelOutputs modelOutputs) { - Objects.requireNonNull(model); - Objects.requireNonNull(modelOutputs); - - this.model = model; - this.outFields = modelOutputs; - } - - /** - * Creates a {@link JPmmlModelRunner} writing to the default stream. - */ - @Override - public ModelRunner newModelRunner() { - try { - return new JPmmlModelRunner(JpmmlFactory.newEvaluator(model), outFields); - } catch (Exception e) { - throw new RuntimeException("Failed to create ModelRunner from model " + model, e); - } - } - } - - /** - * Creates a {@link JPmmlModelRunner} writing to the default stream. PMML Model is downloaded from Blobstore. - */ - public static class ModelRunnerFromBlobStore implements ModelRunnerFactory { - private final String blobKey; - private final ModelOutputs modelOutputs; - private final Map<String, Object> config; - - /** - * Uses Storm config as returned by {@code Utils.readStormConfig()}. - * @param blobKey key of PMML model in Blobstore - */ - public ModelRunnerFromBlobStore(String blobKey, ModelOutputs modelOutputs) { - this(blobKey, modelOutputs, Utils.readStormConfig()); - } - - /** - * Create a new {@code ModelRunnerFromBlobStore}. - * @param blobKey key of PMML model in Blobstore - * @param config Configuration to use to get Blobstore client - */ - public ModelRunnerFromBlobStore(String blobKey, ModelOutputs modelOutputs, Map<String, Object> config) { - Objects.requireNonNull(blobKey); - Objects.requireNonNull(modelOutputs); - Objects.requireNonNull(config); - - this.blobKey = blobKey; - this.modelOutputs = modelOutputs; - this.config = config; - } - - @Override - public ModelRunner newModelRunner() { - try { - return new JPmmlModelRunner(JpmmlFactory.newEvaluator(blobKey, config), modelOutputs); - } catch (Exception e) { - throw new RuntimeException(String.format("Failed to create ModelRunner from model in Blobstore " - + "using blob key [%s] and config [%s]", - blobKey, - config), - e); - } - } - } -} diff --git a/pom.xml b/pom.xml index c04ac0950..583e94607 100644 --- a/pom.xml +++ b/pom.xml @@ -499,7 +499,6 @@ <module>external/storm-kafka-migration</module> <module>external/storm-kafka-monitor</module> <module>external/storm-jms</module> - <module>external/storm-pmml</module> <module>external/storm-rocketmq</module> <module>external/storm-blobstore-migration</module> <module>integration-test</module> @@ -524,7 +523,6 @@ <module>examples/storm-hdfs-examples</module> <module>examples/storm-hive-examples</module> <module>examples/storm-elasticsearch-examples</module> - <module>examples/storm-pmml-examples</module> <module>examples/storm-jms-examples</module> <module>examples/storm-rocketmq-examples</module> <module>examples/storm-perf</module>
