http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java new file mode 100644 index 0000000..8132c7d --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.examples.machinedata.data.MachineKey; +import org.apache.apex.examples.machinedata.operator.MachineInfoAveragingOperator; +import org.apache.apex.examples.machinedata.operator.MachineInfoAveragingPrerequisitesOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; +import com.datatorrent.lib.io.SmtpOutputOperator; + +/** + * <p> + * Resource monitor application. + * </p> + * + * @since 0.3.5 + */ +@ApplicationAnnotation(name = "MachineDataExample") +@SuppressWarnings("unused") +public class Application implements StreamingApplication +{ + + private static final Logger LOG = LoggerFactory.getLogger(Application.class); + + /** + * This function sets up the DAG for calculating the average + * + * @param dag the DAG instance + * @param conf the configuration instance + * @return MachineInfoAveragingPrerequisitesOperator + */ + private MachineInfoAveragingPrerequisitesOperator addAverageCalculation(DAG dag, Configuration conf) + { + MachineInfoAveragingPrerequisitesOperator prereqAverageOper = dag.addOperator("Aggregator", MachineInfoAveragingPrerequisitesOperator.class); + MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class); + RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>()); + dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input); + SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator()); + dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort); + dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input); + return prereqAverageOper; + } + + /** + * Create the DAG + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + InputReceiver randomGen = dag.addOperator("Receiver", InputReceiver.class); + DimensionGenerator dimensionGenerator = dag.addOperator("DimensionsGenerator", DimensionGenerator.class); + dag.addStream("Events", randomGen.outputInline, dimensionGenerator.inputPort); + MachineInfoAveragingPrerequisitesOperator prereqAverageOper = addAverageCalculation(dag, conf); + dag.addStream("DimensionalData", dimensionGenerator.outputInline, prereqAverageOper.inputPort); + } + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java new file mode 100644 index 0000000..6c717c2 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata; + +import org.apache.apex.examples.machinedata.data.MachineInfo; +import org.apache.apex.examples.machinedata.data.MachineKey; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + + +/** + * <p> + * Information tuple generator with randomness. + * </p> + * + * @since 0.3.5 + */ +@SuppressWarnings("unused") +public class DimensionGenerator extends BaseOperator +{ + public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>(); + public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>(); + private int threshold = 90; + + public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() + { + + @Override + public void process(MachineInfo tuple) + { + emitDimensions(tuple); + } + + }; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + } + + /** + * This returns the threshold value set + * @return + */ + public int getThreshold() + { + return threshold; + } + + /** + * This function sets the threshold value. This value is used to check the maximum value for cpu/ram/hdd + * @param threshold + */ + public void setThreshold(int threshold) + { + this.threshold = threshold; + } + + /** + * This function takes in the tuple from upstream operator and generates tuples with different dimension combinations + * + * @param tuple + */ + private void emitDimensions(MachineInfo tuple) + { + MachineKey tupleKey = tuple.getMachineKey(); + + for (int i = 0; i < 64; i++) { + MachineKey machineKey = new MachineKey(tupleKey.getTimeKey(),tupleKey.getDay()); + if ((i & 1) != 0) { + machineKey.setCustomer(tupleKey.getCustomer()); + } + if ((i & 2) != 0) { + machineKey.setProduct(tupleKey.getProduct()); + } + if ((i & 4) != 0) { + machineKey.setOs(tupleKey.getOs()); + } + if ((i & 8) != 0) { + machineKey.setDeviceId(tupleKey.getDeviceId()); + } + if ((i & 16) != 0) { + machineKey.setSoftware1(tupleKey.getSoftware1()); + } + if ((i & 32) != 0) { + machineKey.setSoftware2(tupleKey.getSoftware2()); + } + + int cpu = tuple.getCpu(); + int ram = tuple.getRam(); + int hdd = tuple.getHdd(); + MachineInfo machineInfo = new MachineInfo(); + machineInfo.setMachineKey(machineKey); + machineInfo.setCpu((cpu < threshold) ? cpu : threshold); + machineInfo.setRam((ram < threshold) ? ram : threshold); + machineInfo.setHdd((hdd < threshold) ? hdd : threshold); + outputInline.emit(machineInfo); + output.emit(machineInfo); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java new file mode 100644 index 0000000..263db55 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.Random; +import java.util.TimeZone; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.examples.machinedata.data.MachineInfo; +import org.apache.apex.examples.machinedata.data.MachineKey; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * <p> + * Information tuple generator with randomness. + * </p> + * + * @since 0.3.5 + */ +@SuppressWarnings("unused") +public class InputReceiver extends BaseOperator implements InputOperator +{ + private static final Logger logger = LoggerFactory.getLogger(InputReceiver.class); + + public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>(); + private final Random randomGen = new Random(); + + private int customerMin = 1; + private int customerMax = 5; + private int productMin = 4; + private int productMax = 6; + private int osMin = 10; + private int osMax = 12; + private int software1Min = 10; + private int software1Max = 12; + private int software2Min = 12; + private int software2Max = 14; + private int software3Min = 4; + private int software3Max = 6; + + private int deviceIdMin = 1; + private int deviceIdMax = 50; + + private int tupleBlastSize = 1001; + + private static final DateFormat minuteDateFormat = new SimpleDateFormat("HHmm"); + private static final DateFormat dayDateFormat = new SimpleDateFormat("d"); + + static { + TimeZone tz = TimeZone.getTimeZone("GMT"); + minuteDateFormat.setTimeZone(tz); + dayDateFormat.setTimeZone(tz); + + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + } + + @Override + public void emitTuples() + { + int count = 0; + Calendar calendar = Calendar.getInstance(); + Date date = calendar.getTime(); + String timeKey = minuteDateFormat.format(date); + String day = dayDateFormat.format(date); + + while (count < tupleBlastSize) { + randomGen.setSeed(System.currentTimeMillis()); + + int customerVal = genCustomerId(); + int productVal = genProductVer(); + int osVal = genOsVer(); + int software1Val = genSoftware1Ver(); + int software2Val = genSoftware2Ver(); + int software3Val = genSoftware3Ver(); + int deviceIdVal = genDeviceId(); + + int cpuVal = genCpu(calendar); + int ramVal = genRam(calendar); + int hddVal = genHdd(calendar); + + MachineKey machineKey = new MachineKey(timeKey, day); + + machineKey.setCustomer(customerVal); + machineKey.setProduct(productVal); + machineKey.setOs(osVal); + machineKey.setDeviceId(deviceIdVal); + machineKey.setSoftware1(software1Val); + machineKey.setSoftware2(software2Val); + machineKey.setSoftware3(software3Val); + MachineInfo machineInfo = new MachineInfo(); + machineInfo.setMachineKey(machineKey); + machineInfo.setCpu(cpuVal); + machineInfo.setRam(ramVal); + machineInfo.setHdd(hddVal); + + outputInline.emit(machineInfo); + + count++; + } + } + + private int genCustomerId() + { + int range = customerMax - customerMin + 1; + return customerMin + randomGen.nextInt(range); + } + + private int genProductVer() + { + int range = productMax - productMin + 1; + return productMin + randomGen.nextInt(range); + } + + private int genOsVer() + { + int range = osMax - osMin + 1; + return osMin + randomGen.nextInt(range); + } + + private int genSoftware3Ver() + { + int range = software3Max - software3Min + 1; + return software3Min + randomGen.nextInt(range); + } + + private int genDeviceId() + { + int range = deviceIdMax - deviceIdMin + 1; + return deviceIdMin + randomGen.nextInt(range); + } + + private int genSoftware1Ver() + { + int range = software1Max - software1Min + 1; + return software1Min + randomGen.nextInt(range); + } + + private int genSoftware2Ver() + { + int range = software2Max - software2Min + 1; + return software2Min + randomGen.nextInt(range); + } + + private int genCpu(Calendar cal) + { + int minute = cal.get(Calendar.MINUTE); + int second; + int range = minute / 2 + 19; + if (minute / 17 == 0) { + second = cal.get(Calendar.SECOND); + return (30 + randomGen.nextInt(range) + (minute % 7) - (second % 11)); + } else if (minute / 47 == 0) { + second = cal.get(Calendar.SECOND); + return (7 + randomGen.nextInt(range) + (minute % 7) - (second % 7)); + } else { + second = cal.get(Calendar.SECOND); + return (randomGen.nextInt(range) + (minute % 19) + (second % 7)); + } + } + + private int genRam(Calendar cal) + { + int minute = cal.get(Calendar.MINUTE); + int second; + int range = minute + 1; + if (minute / 23 == 0) { + second = cal.get(Calendar.SECOND); + return (20 + randomGen.nextInt(range) + (minute % 5) - (second % 11)); + } else if (minute / 37 == 0) { + second = cal.get(Calendar.SECOND); + return (11 + randomGen.nextInt(60) - (minute % 5) - (second % 11)); + } else { + second = cal.get(Calendar.SECOND); + return (randomGen.nextInt(range) + (minute % 17) + (second % 11)); + } + } + + private int genHdd(Calendar cal) + { + int minute = cal.get(Calendar.MINUTE); + int second; + int range = minute / 2 + 1; + if (minute / 37 == 0) { + second = cal.get(Calendar.SECOND); + return (25 + randomGen.nextInt(range) - minute % 7 - second % 11); + } else { + second = cal.get(Calendar.SECOND); + return (randomGen.nextInt(range) + minute % 23 + second % 11); + } + } + + /** + * This method returns the minimum value for customer + * + * @return + */ + public int getCustomerMin() + { + return customerMin; + } + + /** + * This method is used to set the minimum value for customer + * + * @param customerMin the minimum customer value + */ + public void setCustomerMin(int customerMin) + { + this.customerMin = customerMin; + } + + /** + * This method returns the max value for customer + * + * @return + */ + public int getCustomerMax() + { + return customerMax; + } + + /** + * This method is used to set the max value for customer + * + * @param customerMax the max customer value + */ + public void setCustomerMax(int customerMax) + { + this.customerMax = customerMax; + } + + /** + * This method returns the minimum value for product + * + * @return + */ + public int getProductMin() + { + return productMin; + } + + /** + * This method is used to set the minimum value for product + * + * @param productMin the minimum product value + */ + public void setProductMin(int productMin) + { + this.productMin = productMin; + } + + /** + * This method returns the max value for product + * + * @return + */ + public int getProductMax() + { + return productMax; + } + + /** + * This method is used to set the max value for product + * + * @param productMax the max product value + */ + public void setProductMax(int productMax) + { + this.productMax = productMax; + } + + /** + * This method returns the minimum value for OS + * + * @return + */ + public int getOsMin() + { + return osMin; + } + + /** + * This method is used to set the minimum value for OS + * + * @param osMin the min OS value + */ + public void setOsMin(int osMin) + { + this.osMin = osMin; + } + + /** + * This method returns the max value for OS + * + * @return + */ + public int getOsMax() + { + return osMax; + } + + /** + * This method is used to set the max value for OS + * + * @param osMax the max OS value + */ + public void setOsMax(int osMax) + { + this.osMax = osMax; + } + + /** + * This method returns the minimum value for software1 + * + * @return + */ + public int getSoftware1Min() + { + return software1Min; + } + + /** + * This method is used to set the minimum value for software1 + * + * @param software1Min the minimum software1 value + */ + public void setSoftware1Min(int software1Min) + { + this.software1Min = software1Min; + } + + /** + * This method returns the max value for software1 + * + * @return + */ + public int getSoftware1Max() + { + return software1Max; + } + + /** + * This method is used to set the max value for software1 + * + * @param software1Max the max software1 value + */ + public void setSoftware1Max(int software1Max) + { + this.software1Max = software1Max; + } + + /** + * This method returns the minimum value for software2 + * + * @return + */ + public int getSoftware2Min() + { + return software2Min; + } + + /** + * This method is used to set the minimum value for software2 + * + * @param software2Min the minimum software2 value + */ + public void setSoftware2Min(int software2Min) + { + this.software2Min = software2Min; + } + + /** + * This method returns the max value for software2 + * + * @return + */ + public int getSoftware2Max() + { + return software2Max; + } + + /** + * This method is used to set the max value for software2 + * + * @param software2Max the max software2 value + */ + public void setSoftware2Max(int software2Max) + { + this.software2Max = software2Max; + } + + /** + * This method returns the minimum value for software3 + * + * @return + */ + public int getSoftware3Min() + { + return software3Min; + } + + /** + * This method is used to set the minimum value for software3 + * + * @param software3Min the minimum software3 value + */ + public void setSoftware3Min(int software3Min) + { + this.software3Min = software3Min; + } + + /** + * This method returns the max value for software3 + * + * @return + */ + public int getSoftware3Max() + { + return software3Max; + } + + /** + * This method is used to set the max value for software3 + * + * @param software3Max the max software3 value + */ + public void setSoftware3Max(int software3Max) + { + this.software3Max = software3Max; + } + + /** + * This method returns the minimum value for deviceId + * + * @return + */ + public int getDeviceIdMin() + { + return deviceIdMin; + } + + /** + * This method is used to set the minimum value for deviceId + * + * @param deviceIdMin the minimum deviceId value + */ + public void setDeviceIdMin(int deviceIdMin) + { + this.deviceIdMin = deviceIdMin; + } + + /** + * This method returns the max value for deviceId + * + * @return + */ + public int getDeviceIdMax() + { + return deviceIdMax; + } + + /** + * This method is used to set the max value for deviceId + * + * @param deviceIdMax the max deviceId value + */ + public void setDeviceIdMax(int deviceIdMax) + { + this.deviceIdMax = deviceIdMax; + } + + /** + * @return the tupleBlastSize + */ + public int getTupleBlastSize() + { + return tupleBlastSize; + } + + /** + * @param tupleBlastSize the tupleBlastSize to set + */ + public void setTupleBlastSize(int tupleBlastSize) + { + this.tupleBlastSize = tupleBlastSize; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java new file mode 100644 index 0000000..3c12335 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.data; + + +/** + * This class stores the value of sum and the count of values summed. + * <p> + * AverageData class. + * </p> + * + * @since 0.3.5 + */ +public class AverageData +{ + + private long cpu; + private long hdd; + private long ram; + private long count; + + /** + * This is default constructor that sets the sum and count to 0 + */ + public AverageData() + { + + } + + /** + * This constructor takes the value of sum and count and initialize the local attributes to corresponding values + * + * @param count + * the value of count + */ + public AverageData(long cpu,long hdd,long ram, long count) + { + this.cpu = cpu; + this.ram = ram; + this.hdd = hdd; + this.count = count; + } + + public long getCpu() + { + return cpu; + } + + public void setCpu(long cpu) + { + this.cpu = cpu; + } + + public long getHdd() + { + return hdd; + } + + public void setHdd(long hdd) + { + this.hdd = hdd; + } + + public long getRam() + { + return ram; + } + + public void setRam(long ram) + { + this.ram = ram; + } + + /** + * This returns the value of count + * @return + */ + public long getCount() + { + return count; + } + + /** + * This method sets the value of count + * @param count + */ + public void setCount(long count) + { + this.count = count; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java new file mode 100644 index 0000000..3952b70 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.data; + +/** + * This class stores the cpu% usage, ram% usage, hdd% usage and key information about a particular machine + * <p> + * MachineInfo class. + * </p> + * + * @since 0.3.5 + */ +public class MachineInfo +{ + private MachineKey machineKey; + private int cpu; + private int ram; + private int hdd; + + /** + * This default constructor + */ + public MachineInfo() + { + } + + /** + * This constructor takes MachineKey as input and initialize local attributes + * + * @param machineKey + * the MachineKey instance + */ + public MachineInfo(MachineKey machineKey) + { + this.machineKey = machineKey; + } + + /** + * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes + * + * @param machineKey + * the MachineKey instance + * @param cpu + * the CPU% usage + * @param ram + * the RAM% usage + * @param hdd + * the HDD% usage + */ + public MachineInfo(MachineKey machineKey, int cpu, int ram, int hdd) + { + this.machineKey = machineKey; + this.cpu = cpu; + this.ram = ram; + this.hdd = hdd; + } + + /** + * This method returns the MachineKey + * + * @return + */ + public MachineKey getMachineKey() + { + return machineKey; + } + + /** + * This method sets the MachineKey + * + * @param machineKey + * the MachineKey instance + */ + public void setMachineKey(MachineKey machineKey) + { + this.machineKey = machineKey; + } + + /** + * This method returns the CPU% usage + * + * @return + */ + public int getCpu() + { + return cpu; + } + + /** + * This method sets the CPU% usage + * + * @param cpu + * the CPU% usage + */ + public void setCpu(int cpu) + { + this.cpu = cpu; + } + + /** + * This method returns the RAM% usage + * + * @return + */ + public int getRam() + { + return ram; + } + + /** + * This method sets the RAM% usage + * + * @param ram + * the RAM% usage + */ + public void setRam(int ram) + { + this.ram = ram; + } + + /** + * This method returns the HDD% usage + * + * @return + */ + public int getHdd() + { + return hdd; + } + + /** + * This method sets the HDD% usage + * + * @param hdd + * the HDD% usage + */ + public void setHdd(int hdd) + { + this.hdd = hdd; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java new file mode 100644 index 0000000..2bf0a53 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java @@ -0,0 +1,381 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.data; + +/** + * This class stores the information about various softwares, deviceIds, OS of the device + * <p> + * MachineKey class. + * </p> + * + * @since 0.3.5 + */ +public class MachineKey +{ + + private Integer customer; + private Integer product; + private Integer os; + private Integer software1; + private Integer software2; + private Integer software3; + private Integer deviceId; + private String timeKey; + private String day; + + /** + * This constructor takes the format in which time has to be captured and the day when this instance is created + * + * @param timeKey the format in which time has to be captured + * @param day the day when this instance is created + */ + public MachineKey(String timeKey, String day) + { + this.timeKey = timeKey; + this.day = day; + } + + /** + * This is default constructor + */ + public MachineKey() + { + } + + /** + * This constructor takes format in which time has to be captured, the day when this instance is created, the customer + * id, product Id on the device, OS version on the device, software1 version on the device, software2 version on the device, + * software3 version on the device, deviceId on the device, + * + * @param timeKey the format in which time has to be captured + * @param day the day when this instance is created + * @param customer the customer Id + * @param product product Id + * @param os OS version + * @param software1 software1 version + * @param software2 software2 version + * @param software3 software3 version + * @param deviceId deviceId + */ + public MachineKey(String timeKey, String day, Integer customer, Integer product, Integer os, Integer software1, Integer software2, Integer software3, Integer deviceId) + { + this.timeKey = timeKey; + this.day = day; + this.customer = customer; + this.product = product; + this.os = os; + this.software1 = software1; + this.software2 = software2; + this.software3 = software3; + this.deviceId = deviceId; + } + + /** + * This method returns the format in which the time is captured. The time is the time when this instance of MachineKey + * was generated. For e.g. HHmm to capture Hour and minute + * + * @return + */ + public String getTimeKey() + { + return timeKey; + } + + /** + * This method sets the format in which the time is captured. The time is the time when this instance of MachineKey + * was generated. For e.g. HHmm to capture Hour and minute + * + * @param timeKey + * the value of format + */ + public void setTimeKey(String timeKey) + { + this.timeKey = timeKey; + } + + /** + * This method returns the day of the month when this instance of MachineKey was generated + * + * @return + */ + public String getDay() + { + return day; + } + + /** + * This method sets the day of the month when this instance of MachineKey was generated + * + * @param day + * the day of the month + */ + public void setDay(String day) + { + this.day = day; + } + + /** + * This method returns the customer Id + * + * @return + */ + public Integer getCustomer() + { + return customer; + } + + /** + * This method sets the customer Id + * + * @param customer + * the customer Id + */ + public void setCustomer(Integer customer) + { + this.customer = customer; + } + + /** + * This method returns product on the device + * + * @return + */ + public Integer getProduct() + { + return product; + } + + /** + * This method sets the product on the device + * + * @param product + * the value of product + */ + public void setProduct(Integer product) + { + this.product = product; + } + + /** + * This method returns the OS version on the device + * + * @return + */ + public Integer getOs() + { + return os; + } + + /** + * This method sets the OS version on the device + * + * @param os + * OS version + */ + public void setOs(Integer os) + { + this.os = os; + } + + /** + * This method returns the version of the software1 on the device + * + * @return + */ + public Integer getSoftware1() + { + return software1; + } + + /** + * This method sets the version of the software1 on the device + * + * @param software1 the version of the software1 + */ + public void setSoftware1(Integer software1) + { + this.software1 = software1; + } + + /** + * This method returns the version of the software2 on the device + * + * @return + */ + public Integer getSoftware2() + { + return software2; + } + + /** + * This method sets the version of the software2 on the device + * + * @param software2 + * the version of the software2 + */ + public void setSoftware2(Integer software2) + { + this.software2 = software2; + } + + /** + * This method returns the version of the software3 on the device + * + * @return + */ + public Integer getSoftware3() + { + return software3; + } + + /** + * This method sets the version of the software3 on the device + * + * @param software3 + * the version of the software3 + */ + public void setSoftware3(Integer software3) + { + this.software3 = software3; + } + + @Override + public int hashCode() + { + int key = 0; + if (customer != null) { + key |= (1 << 31); + key ^= customer; + } + if (product != null) { + key |= (1 << 30); + key ^= product; + } + if (os != null) { + key |= (1 << 29); + key ^= os; + } + if (software1 != null) { + key |= (1 << 28); + key ^= software1; + } + if (software2 != null) { + key |= (1 << 27); + key ^= software2; + } + if (software3 != null) { + key |= (1 << 26); + key ^= software3; + } + if (deviceId != null) { + key |= (1 << 25); + key ^= deviceId; + } + if (timeKey != null) { + key |= (1 << 24); + key ^= timeKey.hashCode(); + } + if (day != null) { + key |= (1 << 23); + key ^= day.hashCode(); + } + + return key; + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof MachineKey)) { + return false; + } + MachineKey mkey = (MachineKey)obj; + return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId); + } + + private boolean checkIntEqual(Integer a, Integer b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && a.equals(b)) { + return true; + } + return false; + } + + private boolean checkStringEqual(String a, String b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && a.equals(b)) { + return true; + } + return false; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(timeKey); + if (customer != null) { + sb.append("|0:").append(customer); + } + if (product != null) { + sb.append("|1:").append(product); + } + if (os != null) { + sb.append("|2:").append(os); + } + if (software1 != null) { + sb.append("|3:").append(software1); + } + if (software2 != null) { + sb.append("|4:").append(software2); + } + if (software3 != null) { + sb.append("|5:").append(software3); + } + if (deviceId != null) { + sb.append("|6:").append(deviceId); + } + return sb.toString(); + } + + /** + * This method returns the deviceId of the device + * @return The deviceId + */ + public Integer getDeviceId() + { + return deviceId; + } + + /** + * This method sets the deviceId of the device + * + * @param deviceId + */ + public void setDeviceId(Integer deviceId) + { + this.deviceId = deviceId; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java new file mode 100644 index 0000000..2fd457a --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.data; + +import java.util.Map; + +import com.google.common.collect.Maps; + +/** + * This class captures the resources whose usage is collected for each device + * <p>ResourceType class.</p> + * + * @since 0.3.5 + */ +public enum ResourceType +{ + + CPU("cpu"), RAM("ram"), HDD("hdd"); + + private static Map<String, ResourceType> descToResource = Maps.newHashMap(); + + static { + for (ResourceType type : ResourceType.values()) { + descToResource.put(type.desc, type); + } + } + + private String desc; + + private ResourceType(String desc) + { + this.desc = desc; + } + + @Override + public String toString() + { + return desc; + } + + /** + * This method returns ResourceType for the given description + * @param desc the description + * @return + */ + public static ResourceType getResourceTypeOf(String desc) + { + return descToResource.get(desc); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java new file mode 100644 index 0000000..34a9514 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.operator; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import org.apache.apex.examples.machinedata.data.MachineInfo; +import org.apache.apex.examples.machinedata.data.MachineKey; +import org.apache.apex.examples.machinedata.data.ResourceType; +import org.apache.apex.examples.machinedata.util.DataTable; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.KeyValPair; + +/** + * <p> + * CalculatorOperator class. + * </p> + * + * @since 0.3.5 + */ +public class CalculatorOperator extends BaseOperator +{ + + private final DataTable<MachineKey, ResourceType, List<Integer>> data = new DataTable<>(); + + @Min(1) + @Max(99) + private int kthPercentile = 95; // kth percentile + private boolean computePercentile; + private boolean computeSD; + private boolean computeMax; + + private int percentileThreshold = 80; + private int sdThreshold = 70; + private int maxThreshold = 99; + + public final transient DefaultInputPort<MachineInfo> dataPort = new DefaultInputPort<MachineInfo>() + { + @Override + public void process(MachineInfo tuple) + { + addDataToCache(tuple); + } + + /** + * Stream codec used for partitioning. + */ + @Override + public StreamCodec<MachineInfo> getStreamCodec() + { + return new MachineInfoStreamCodec(); + } + }; + + public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> percentileOutputPort = new DefaultOutputPort<>(); + + public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> sdOutputPort = new DefaultOutputPort<>(); + + public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Integer>>> maxOutputPort = new DefaultOutputPort<>(); + + public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>(); + + private void addDataToCache(MachineInfo tuple) + { + MachineKey machineKey = tuple.getMachineKey(); + if (!data.containsRow(machineKey)) { + data.put(machineKey, ResourceType.CPU, Lists.<Integer>newArrayList()); + data.put(machineKey, ResourceType.RAM, Lists.<Integer>newArrayList()); + data.put(machineKey, ResourceType.HDD, Lists.<Integer>newArrayList()); + } + data.get(machineKey, ResourceType.CPU).add(tuple.getCpu()); + data.get(machineKey, ResourceType.RAM).add(tuple.getRam()); + data.get(machineKey, ResourceType.HDD).add(tuple.getHdd()); + } + + @Override + public void endWindow() + { + + if (computePercentile) { + for (MachineKey machineKey : data.rowKeySet()) { + Collections.sort(data.get(machineKey, ResourceType.CPU)); + Collections.sort(data.get(machineKey, ResourceType.RAM)); + Collections.sort(data.get(machineKey, ResourceType.HDD)); + + Map<ResourceType, Double> percentileData = Maps.newHashMap(); + percentileData.put(ResourceType.CPU, getKthPercentile(data.get(machineKey, ResourceType.CPU))); + percentileData.put(ResourceType.RAM, getKthPercentile(data.get(machineKey, ResourceType.RAM))); + percentileData.put(ResourceType.HDD, getKthPercentile(data.get(machineKey, ResourceType.HDD))); + percentileOutputPort.emit(new KeyValPair<>(machineKey, percentileData)); + + for (ResourceType resourceType : percentileData.keySet()) { + double percentileValue = percentileData.get(resourceType); + if (percentileValue > percentileThreshold) { + emitAlert(resourceType, machineKey, percentileValue, "Percentile"); + } + } + } + } + if (computeSD) { + for (MachineKey machineKey : data.rowKeySet()) { + + Map<ResourceType, Double> sdData = Maps.newHashMap(); + + for (ResourceType resourceType : ResourceType.values()) { + sdData.put(resourceType, getSD(data.get(machineKey, resourceType))); + } + sdOutputPort.emit(new KeyValPair<>(machineKey, sdData)); + + for (ResourceType resourceType : sdData.keySet()) { + double sdValue = sdData.get(resourceType); + if (sdValue > sdThreshold) { + emitAlert(resourceType, machineKey, sdValue, "SD"); + } + } + } + } + if (computeMax) { + for (MachineKey machineKey : data.rowKeySet()) { + + Map<ResourceType, Integer> maxData = Maps.newHashMap(); + maxData.put(ResourceType.CPU, Collections.max(data.get(machineKey, ResourceType.CPU))); + maxData.put(ResourceType.RAM, Collections.max(data.get(machineKey, ResourceType.RAM))); + maxData.put(ResourceType.HDD, Collections.max(data.get(machineKey, ResourceType.HDD))); + + maxOutputPort.emit(new KeyValPair<>(machineKey, maxData)); + + for (ResourceType resourceType : maxData.keySet()) { + double sdValue = maxData.get(resourceType).doubleValue(); + if (sdValue > maxThreshold) { + emitAlert(resourceType, machineKey, sdValue, "Max"); + } + } + } + } + data.clear(); + } + + private void emitAlert(ResourceType type, MachineKey machineKey, double alertVal, String prefix) + { + BigDecimal decimalVal = new BigDecimal(alertVal); + decimalVal = decimalVal.setScale(2, BigDecimal.ROUND_HALF_UP); + String alertTime = machineKey.getDay() + machineKey.getTimeKey(); + smtpAlert.emit(prefix + "-" + type.toString().toUpperCase() + " alert at " + alertTime + " " + type + " usage breached current usage: " + decimalVal.doubleValue() + "% threshold: " + percentileThreshold + "%\n\n" + machineKey); + } + + private double getKthPercentile(List<Integer> sorted) + { + + double val = (kthPercentile * sorted.size()) / 100.0; + if (val == (int)val) { + // Whole number + int idx = (int)val - 1; + return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0; + } else { + int idx = (int)Math.round(val) - 1; + return sorted.get(idx); + } + } + + private double getSD(List<Integer> data) + { + int sum = 0; + for (int i : data) { + sum += i; + } + double avg = sum / (data.size() * 1.0); + double sd = 0; + for (Integer point : data) { + sd += Math.pow(point - avg, 2); + } + return Math.sqrt(sd); + } + + /** + * @param kVal the percentile which will be emitted by this operator + */ + public void setKthPercentile(int kVal) + { + this.kthPercentile = kVal; + } + + /** + * @param doCompute when true percentile will be computed + */ + public void setComputePercentile(boolean doCompute) + { + this.computePercentile = doCompute; + } + + /** + * @param doCompute when true standard deviation will be computed + */ + public void setComputeSD(boolean doCompute) + { + this.computeSD = doCompute; + } + + /** + * @param doCompute when true max will be computed + */ + public void setComputeMax(boolean doCompute) + { + this.computeMax = doCompute; + } + + /** + * @param threshold for percentile when breached will cause alert + */ + public void setPercentileThreshold(int threshold) + { + this.percentileThreshold = threshold; + } + + /** + * @param threshold for standard deviation when breached will cause alert + */ + public void setSDThreshold(int threshold) + { + this.sdThreshold = threshold; + } + + /** + * @param threshold for Max when breached will cause alert + */ + public void setMaxThreshold(int threshold) + { + this.maxThreshold = threshold; + } + + public static class MachineInfoStreamCodec extends KryoSerializableStreamCodec<MachineInfo> implements Serializable + { + public MachineInfoStreamCodec() + { + super(); + } + + @Override + public int getPartition(MachineInfo o) + { + return Objects.hashCode(o.getMachineKey().getCustomer(), o.getMachineKey().getOs(), o.getMachineKey().getProduct(), o.getMachineKey().getSoftware1(), o.getMachineKey().getSoftware2(), o.getMachineKey().getSoftware3()); + } + + private static final long serialVersionUID = 201411031403L; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java new file mode 100644 index 0000000..1de676f --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.operator; + +import java.math.BigDecimal; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.examples.machinedata.data.AverageData; +import org.apache.apex.examples.machinedata.data.MachineInfo; +import org.apache.apex.examples.machinedata.data.MachineKey; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.lib.util.KeyHashValPair; +import com.datatorrent.lib.util.KeyValPair; + +/** + * This class calculates the average for various resources across different devices for a given key + * <p>MachineInfoAveragingOperator class.</p> + * + * @since 0.9.0 + */ +@SuppressWarnings("unused") +public class MachineInfoAveragingOperator extends BaseOperator +{ + + public static final String CPU = "cpu"; + public static final String RAM = "ram"; + public static final String HDD = "hdd"; + public static final String DAY = "day"; + + private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>(); + + public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>(); + + public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>(); + + private int threshold = 95; + + /** + * Buffer all the tuples as is till end window gets called + */ + public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>() + { + + @Override + public void process(KeyHashValPair<MachineKey, AverageData> tuple) + { + addTuple(tuple); + } + }; + + /** + * This method returns the threshold value + * + * @return + */ + public int getThreshold() + { + return threshold; + } + + /** + * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent + * + * @param threshold the threshold value + */ + public void setThreshold(int threshold) + { + this.threshold = threshold; + } + + /** + * This adds the given tuple to the dataMap + * + * @param tuple input tuple + */ + private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple) + { + MachineKey key = tuple.getKey(); + dataMap.put(key, tuple.getValue()); + } + + @Override + public void endWindow() + { + for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) { + MachineKey key = entry.getKey(); + AverageData averageResultMap = entry.getValue(); + Map<String, String> averageResult = Maps.newHashMap(); + long count = averageResultMap.getCount(); + double average = averageResultMap.getCpu() / count; + averageResult.put(CPU, average + ""); + emitAlert(average, CPU, key); + average = averageResultMap.getHdd() / count; + averageResult.put(HDD, average + ""); + emitAlert(average, HDD, key); + average = averageResultMap.getRam() / count; + averageResult.put(RAM, average + ""); + emitAlert(average, RAM, key); + averageResult.put(DAY, key.getDay()); + outputPort.emit(new KeyValPair<>(key, averageResult)); + } + dataMap.clear(); + } + + private void emitAlert(double average, String resourceType, MachineKey key) + { + if (average > threshold) { + BigDecimal bd = new BigDecimal(average); + bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP); + String stime = key.getDay() + key.getTimeKey(); + String skey = getKeyInfo(key); + smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey); + } + } + + /** + * This method is used to artificially generate alerts + * + * @param genAlert + */ + public void setGenAlert(boolean genAlert) + { + Calendar calendar = Calendar.getInstance(); + long timestamp = System.currentTimeMillis(); + calendar.setTimeInMillis(timestamp); + DateFormat minuteDateFormat = new SimpleDateFormat("HHmm"); + Date date = calendar.getTime(); + String timeKey = minuteDateFormat.format(date); + DateFormat dayDateFormat = new SimpleDateFormat("dd"); + String day = dayDateFormat.format(date); + + MachineKey alertKey = new MachineKey(timeKey, day); + alertKey.setCustomer(1); + alertKey.setProduct(5); + alertKey.setOs(10); + alertKey.setSoftware1(12); + alertKey.setSoftware2(14); + alertKey.setSoftware3(6); + + MachineInfo machineInfo = new MachineInfo(); + machineInfo.setMachineKey(alertKey); + machineInfo.setCpu(threshold + 1); + machineInfo.setRam(threshold + 1); + machineInfo.setHdd(threshold + 1); + + smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey)); + smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey)); + smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey)); + } + + /** + * This method returns the String for a given MachineKey instance + * + * @param key MachineKey instance that needs to be converted to string + * @return + */ + private String getKeyInfo(MachineKey key) + { + StringBuilder sb = new StringBuilder(); + if (key instanceof MachineKey) { + MachineKey mkey = (MachineKey)key; + Integer customer = mkey.getCustomer(); + if (customer != null) { + sb.append("customer: " + customer + "\n"); + } + Integer product = mkey.getProduct(); + if (product != null) { + sb.append("product version: " + product + "\n"); + } + Integer os = mkey.getOs(); + if (os != null) { + sb.append("os version: " + os + "\n"); + } + Integer software1 = mkey.getSoftware1(); + if (software1 != null) { + sb.append("software1 version: " + software1 + "\n"); + } + Integer software2 = mkey.getSoftware2(); + if (software2 != null) { + sb.append("software2 version: " + software2 + "\n"); + } + Integer software3 = mkey.getSoftware3(); + if (software3 != null) { + sb.append("software3 version: " + software3 + "\n"); + } + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java new file mode 100644 index 0000000..14c8d25 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.operator; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.examples.machinedata.data.AverageData; +import org.apache.apex.examples.machinedata.data.MachineInfo; +import org.apache.apex.examples.machinedata.data.MachineKey; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * This class calculates the partial sum and count for tuples generated by upstream Operator + * <p> MachineInfoAveragingPrerequisitesOperator class. </p> + * + * @since 0.3.5 + */ +public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator +{ + + // Aggregate sum of all values seen for a key. + private Map<MachineKey, AverageData> sums = new HashMap<>(); + + public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>>() + { + @Override + public Unifier<KeyHashValPair<MachineKey, AverageData>> getUnifier() + { + MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier(); + return unifier; + } + }; + + public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() + { + + @Override + public void process(MachineInfo tuple) + { + MachineKey key = tuple.getMachineKey(); + AverageData averageData = sums.get(key); + if (averageData == null) { + averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1); + sums.put(key, averageData); + } else { + averageData.setCpu(averageData.getCpu() + tuple.getCpu()); + averageData.setRam(averageData.getRam() + tuple.getRam()); + averageData.setHdd(averageData.getHdd() + tuple.getHdd()); + averageData.setCount(averageData.getCount() + 1); + } + } + }; + + @Override + public void endWindow() + { + + for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) { + if (outputPort.isConnected()) { + outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue())); + } + } + sums.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java new file mode 100644 index 0000000..02aabe7 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.operator; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.examples.machinedata.data.AverageData; +import org.apache.apex.examples.machinedata.data.MachineKey; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.Unifier; + +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * This class calculates the partial sum and count for a given key + * <p>MachineInfoAveragingUnifier class.</p> + * + * @since 0.9.0 + */ +public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<MachineKey, AverageData>> +{ + + private Map<MachineKey, AverageData> sums = new HashMap<>(); + public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<>(); + + @Override + public void beginWindow(long arg0) + { + // TODO Auto-generated method stub + + } + + @Override + public void endWindow() + { + for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) { + outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue())); + } + sums.clear(); + + } + + @Override + public void setup(OperatorContext arg0) + { + // TODO Auto-generated method stub + + } + + @Override + public void teardown() + { + // TODO Auto-generated method stub + + } + + @Override + public void process(KeyHashValPair<MachineKey, AverageData> arg0) + { + MachineKey tupleKey = arg0.getKey(); + AverageData averageData = sums.get(tupleKey); + AverageData tupleValue = arg0.getValue(); + if (averageData == null) { + sums.put(tupleKey, tupleValue); + } else { + averageData.setCpu(averageData.getCpu() + tupleValue.getCpu()); + averageData.setRam(averageData.getRam() + tupleValue.getRam()); + averageData.setHdd(averageData.getHdd() + tupleValue.getHdd()); + averageData.setCount(averageData.getCount() + tupleValue.getCount()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java new file mode 100644 index 0000000..0ff8985 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Generate combinations of elements for the given array of elements. + * + * Implements nCr = n! / (r! * (n-r)!) + * + * @since 0.3.5 + */ +public class Combinatorics<T> +{ + + private T[] values; + private int size = -1; + private List<T> result; + private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>(); + private int resultMapSize = 0; + + /** + * Generates all possible combinations with all the sizes. + * + * @param values + */ + public Combinatorics(T[] values) + { + this.values = values; + this.size = -1; + this.result = new ArrayList<>(); + } + + /** + * Generates all possible combinations with the given size. + * + * @param values + * @param size + */ + public Combinatorics(T[] values, int size) + { + this.values = values; + this.size = size; + this.result = new ArrayList<>(); + } + + public Map<Integer, List<T>> generate() + { + + if (size == -1) { + size = values.length; + for (int i = 1; i <= size; i++) { + int[] tmp = new int[i]; + Arrays.fill(tmp, -1); + generateCombinations(0, 0, tmp); + } + } else { + int[] tmp = new int[size]; + Arrays.fill(tmp, -1); + generateCombinations(0, 0, tmp); + } + return resultMap; + } + + public void generateCombinations(int start, int depth, int[] tmp) + { + if (depth == tmp.length) { + for (int j = 0; j < depth; j++) { + result.add(values[tmp[j]]); + } + resultMap.put(++resultMapSize, result); + result = new ArrayList<>(); + return; + } + for (int i = start; i < values.length; i++) { + tmp[depth] = i; + generateCombinations(i + 1, depth + 1, tmp); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java new file mode 100644 index 0000000..a147268 --- /dev/null +++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata.util; + +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.collect.Maps; + +/** + * <p>DataTable class.</p> + * + * @since 0.3.5 + */ +public class DataTable<R,C,E> +{ + + //machineKey, [cpu,ram,hdd] -> value + private final Map<R,Map<C,E>> table = Maps.newHashMap(); + + public boolean containsRow(R rowKey) + { + return table.containsKey(rowKey); + } + + public void put(R rowKey,C colKey, E entry) + { + if (!containsRow(rowKey)) { + table.put(rowKey, Maps.<C,E>newHashMap()); + } + table.get(rowKey).put(colKey, entry); + } + + @Nullable + public E get(R rowKey, C colKey) + { + if (!containsRow(rowKey)) { + return null; + } + return table.get(rowKey).get(colKey); + } + + public Set<R> rowKeySet() + { + return table.keySet(); + } + + public void clear() + { + table.clear(); + } + + public Map<R,Map<C,E>> getTable() + { + return table; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/main/resources/META-INF/properties.xml b/examples/machinedata/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..4ceb6b9 --- /dev/null +++ b/examples/machinedata/src/main/resources/META-INF/properties.xml @@ -0,0 +1,139 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> + <value>1000</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Receiver.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Receiver.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.DimensionsGenerator.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> + <property> + <name>dt.application.MachineDataExample.stream.Events.locality + </name> + <value>CONTAINER_LOCAL</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.DimensionsGenerator.inputport.inputPort.attr.PARTITION_PARALLEL + </name> + <value>true</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Aggregator.inputport.inputPort.attr.PARTITION_PARALLEL + </name> + <value>true</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Aggregator.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.AverageCalculator.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Persister.inputport.input.attr.PARTITION_PARALLEL + </name> + <value>true</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Persister.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Persister.store.dbIndex + </name> + <value>2</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Persister.store.host + </name> + <value></value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Persister.store.port + </name> + <value>6379</value> + </property> + <property> + <name>dt.application.MachineDataExample.port.*.attr.QUEUE_CAPACITY</name> + <value>32000</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.from + </name> + <value></value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.subject + </name> + <value>Alert!!!</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.recipients.TO + </name> + <value></value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.content + </name> + <value>{}</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.smtpHost + </name> + <value>localhost</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.smtpPort + </name> + <value>25</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Alerter.useSsl + </name> + <value>false</value> + </property> + <property> + <name>dt.application.MachineDataExample.operator.Aggregator.outputport.outputPort.attr.UNIFIER_LIMIT + </name> + <value>8</value> + </property> + <property> + <name>dt.application.MachineDataExample.stream.DimensionalData.locality + </name> + <value>THREAD_LOCAL</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java b/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java new file mode 100644 index 0000000..c69a2aa --- /dev/null +++ b/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.machinedata; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.examples.machinedata.data.MachineInfo; +import org.apache.apex.examples.machinedata.data.MachineKey; +import org.apache.apex.examples.machinedata.data.ResourceType; +import org.apache.apex.examples.machinedata.operator.CalculatorOperator; + + +import com.google.common.collect.ImmutableList; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.TimeBucketKey; + +/** + * @since 0.3.5 + */ +public class CalculatorOperatorTest +{ + private static DateFormat minuteDateFormat = new SimpleDateFormat("HHmm"); + private static Logger LOG = LoggerFactory.getLogger(CalculatorOperatorTest.class); + + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + CalculatorOperator calculatorOperator = new CalculatorOperator(); + calculatorOperator.setup(null); + + calculatorOperator.setComputePercentile(true); + calculatorOperator.setComputeMax(true); + calculatorOperator.setComputeSD(true); + + testPercentile(calculatorOperator); + } + + public void testPercentile(CalculatorOperator oper) + { + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.percentileOutputPort.setSink(sortSink); + oper.setKthPercentile(50); + Calendar calendar = Calendar.getInstance(); + Date date = calendar.getTime(); + String timeKey = minuteDateFormat.format(date); + String day = calendar.get(Calendar.DAY_OF_MONTH) + ""; + + Integer vs = new Integer(1); + MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs); + + oper.beginWindow(0); + + MachineInfo info = new MachineInfo(mk, 1, 1, 1); + oper.dataPort.process(info); + + info.setCpu(2); + oper.dataPort.process(info); + + info.setCpu(3); + oper.dataPort.process(info); + + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + for (Object o : sortSink.collectedTuples) { + LOG.debug(o.toString()); + KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; + Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0); + Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0); + Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0); + + } + LOG.debug("Done percentile testing\n"); + + } + + public void testStandarDeviation(CalculatorOperator oper) + { + CollectorTestSink sortSink = new CollectorTestSink(); + oper.sdOutputPort.setSink(sortSink); + Calendar calendar = Calendar.getInstance(); + Date date = calendar.getTime(); + String timeKey = minuteDateFormat.format(date); + String day = calendar.get(Calendar.DAY_OF_MONTH) + ""; + + Integer vs = new Integer(1); + MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs); + + oper.beginWindow(0); + + MachineInfo info = new MachineInfo(mk, 1, 1, 1); + oper.dataPort.process(info); + + info.setCpu(2); + oper.dataPort.process(info); + + info.setCpu(3); + oper.dataPort.process(info); + + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + for (Object o : sortSink.collectedTuples) { + LOG.debug(o.toString()); + KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; + Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0); + Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0); + Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0); + + } + LOG.debug("Done sd testing\n"); + + } + + private final double getSD(List<Integer> input) + { + int sum = 0; + for (int i : input) { + sum += i; + } + double avg = sum / (input.size() * 1.0); + double sd = 0; + for (Integer point : input) { + sd += Math.pow(point - avg, 2); + } + return Math.sqrt(sd); + } + + public void testMax(CalculatorOperator oper) + { + CollectorTestSink sortSink = new CollectorTestSink(); + oper.maxOutputPort.setSink(sortSink); + Calendar calendar = Calendar.getInstance(); + Date date = calendar.getTime(); + String timeKey = minuteDateFormat.format(date); + String day = calendar.get(Calendar.DAY_OF_MONTH) + ""; + + Integer vs = new Integer(1); + MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs); + + oper.beginWindow(0); + + MachineInfo info = new MachineInfo(mk, 1, 1, 1); + oper.dataPort.process(info); + + info.setCpu(2); + oper.dataPort.process(info); + + info.setCpu(3); + oper.dataPort.process(info); + + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + for (Object o : sortSink.collectedTuples) { + LOG.debug(o.toString()); + KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; + Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0); + Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0); + Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0); + + } + LOG.debug("Done max testing\n"); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/machinedata/src/test/resources/log4j.properties b/examples/machinedata/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/machinedata/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/pom.xml ---------------------------------------------------------------------- diff --git a/examples/mobile/pom.xml b/examples/mobile/pom.xml new file mode 100644 index 0000000..cb40887 --- /dev/null +++ b/examples/mobile/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-mobile</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Mobile Example</name> + <description></description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <!-- add your dependencies here --> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>8.1.10.v20130312</version> + <scope>test</scope> + <type>jar</type> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-websocket</artifactId> + <version>8.1.10.v20130312</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.1</version> + <type>jar</type> + </dependency> + </dependencies> + +</project>
