http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java deleted file mode 100644 index 55b299f..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator; -import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator; -import com.datatorrent.lib.io.SmtpOutputOperator; - -/** - * <p> - * Resource monitor application. - * </p> - * - * @since 0.3.5 - */ -@ApplicationAnnotation(name = "MachineDataDemo") -@SuppressWarnings("unused") -public class Application implements StreamingApplication -{ - - private static final Logger LOG = LoggerFactory.getLogger(Application.class); - - /** - * This function sets up the DAG for calculating the average - * - * @param dag the DAG instance - * @param conf the configuration instance - * @return MachineInfoAveragingPrerequisitesOperator - */ - private MachineInfoAveragingPrerequisitesOperator addAverageCalculation(DAG dag, Configuration conf) - { - MachineInfoAveragingPrerequisitesOperator prereqAverageOper = dag.addOperator("Aggregator", MachineInfoAveragingPrerequisitesOperator.class); - MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class); - RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>()); - dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input); - SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator()); - dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort); - dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input); - return prereqAverageOper; - } - - /** - * Create the DAG - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - InputReceiver randomGen = dag.addOperator("Receiver", InputReceiver.class); - DimensionGenerator dimensionGenerator = dag.addOperator("DimensionsGenerator", DimensionGenerator.class); - dag.addStream("Events", randomGen.outputInline, dimensionGenerator.inputPort); - MachineInfoAveragingPrerequisitesOperator prereqAverageOper = addAverageCalculation(dag, conf); - dag.addStream("DimensionalData", dimensionGenerator.outputInline, prereqAverageOper.inputPort); - } - -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java deleted file mode 100644 index 75c2a02..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; - -/** - * <p> - * Information tuple generator with randomness. - * </p> - * - * @since 0.3.5 - */ -@SuppressWarnings("unused") -public class DimensionGenerator extends BaseOperator -{ - public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>(); - public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>(); - private int threshold = 90; - - public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() - { - - @Override - public void process(MachineInfo tuple) - { - emitDimensions(tuple); - } - - }; - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - } - - /** - * This returns the threshold value set - * @return - */ - public int getThreshold() - { - return threshold; - } - - /** - * This function sets the threshold value. This value is used to check the maximum value for cpu/ram/hdd - * @param threshold - */ - public void setThreshold(int threshold) - { - this.threshold = threshold; - } - - /** - * This function takes in the tuple from upstream operator and generates tuples with different dimension combinations - * - * @param tuple - */ - private void emitDimensions(MachineInfo tuple) - { - MachineKey tupleKey = tuple.getMachineKey(); - - for (int i = 0; i < 64; i++) { - MachineKey machineKey = new MachineKey(tupleKey.getTimeKey(),tupleKey.getDay()); - if ((i & 1) != 0) { - machineKey.setCustomer(tupleKey.getCustomer()); - } - if ((i & 2) != 0) { - machineKey.setProduct(tupleKey.getProduct()); - } - if ((i & 4) != 0) { - machineKey.setOs(tupleKey.getOs()); - } - if ((i & 8) != 0) { - machineKey.setDeviceId(tupleKey.getDeviceId()); - } - if ((i & 16) != 0) { - machineKey.setSoftware1(tupleKey.getSoftware1()); - } - if ((i & 32) != 0) { - machineKey.setSoftware2(tupleKey.getSoftware2()); - } - - int cpu = tuple.getCpu(); - int ram = tuple.getRam(); - int hdd = tuple.getHdd(); - MachineInfo machineInfo = new MachineInfo(); - machineInfo.setMachineKey(machineKey); - machineInfo.setCpu((cpu < threshold) ? cpu : threshold); - machineInfo.setRam((ram < threshold) ? ram : threshold); - machineInfo.setHdd((hdd < threshold) ? hdd : threshold); - outputInline.emit(machineInfo); - output.emit(machineInfo); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java deleted file mode 100644 index 85ec954..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java +++ /dev/null @@ -1,523 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata; - -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.Random; -import java.util.TimeZone; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; - -/** - * <p> - * Information tuple generator with randomness. - * </p> - * - * @since 0.3.5 - */ -@SuppressWarnings("unused") -public class InputReceiver extends BaseOperator implements InputOperator -{ - private static final Logger logger = LoggerFactory.getLogger(InputReceiver.class); - - public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>(); - private final Random randomGen = new Random(); - - private int customerMin = 1; - private int customerMax = 5; - private int productMin = 4; - private int productMax = 6; - private int osMin = 10; - private int osMax = 12; - private int software1Min = 10; - private int software1Max = 12; - private int software2Min = 12; - private int software2Max = 14; - private int software3Min = 4; - private int software3Max = 6; - - private int deviceIdMin = 1; - private int deviceIdMax = 50; - - private int tupleBlastSize = 1001; - - private static final DateFormat minuteDateFormat = new SimpleDateFormat("HHmm"); - private static final DateFormat dayDateFormat = new SimpleDateFormat("d"); - - static { - TimeZone tz = TimeZone.getTimeZone("GMT"); - minuteDateFormat.setTimeZone(tz); - dayDateFormat.setTimeZone(tz); - - } - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - } - - @Override - public void beginWindow(long windowId) - { - super.beginWindow(windowId); - } - - @Override - public void emitTuples() - { - int count = 0; - Calendar calendar = Calendar.getInstance(); - Date date = calendar.getTime(); - String timeKey = minuteDateFormat.format(date); - String day = dayDateFormat.format(date); - - while (count < tupleBlastSize) { - randomGen.setSeed(System.currentTimeMillis()); - - int customerVal = genCustomerId(); - int productVal = genProductVer(); - int osVal = genOsVer(); - int software1Val = genSoftware1Ver(); - int software2Val = genSoftware2Ver(); - int software3Val = genSoftware3Ver(); - int deviceIdVal = genDeviceId(); - - int cpuVal = genCpu(calendar); - int ramVal = genRam(calendar); - int hddVal = genHdd(calendar); - - MachineKey machineKey = new MachineKey(timeKey, day); - - machineKey.setCustomer(customerVal); - machineKey.setProduct(productVal); - machineKey.setOs(osVal); - machineKey.setDeviceId(deviceIdVal); - machineKey.setSoftware1(software1Val); - machineKey.setSoftware2(software2Val); - machineKey.setSoftware3(software3Val); - MachineInfo machineInfo = new MachineInfo(); - machineInfo.setMachineKey(machineKey); - machineInfo.setCpu(cpuVal); - machineInfo.setRam(ramVal); - machineInfo.setHdd(hddVal); - - outputInline.emit(machineInfo); - - count++; - } - } - - private int genCustomerId() - { - int range = customerMax - customerMin + 1; - return customerMin + randomGen.nextInt(range); - } - - private int genProductVer() - { - int range = productMax - productMin + 1; - return productMin + randomGen.nextInt(range); - } - - private int genOsVer() - { - int range = osMax - osMin + 1; - return osMin + randomGen.nextInt(range); - } - - private int genSoftware3Ver() - { - int range = software3Max - software3Min + 1; - return software3Min + randomGen.nextInt(range); - } - - private int genDeviceId() - { - int range = deviceIdMax - deviceIdMin + 1; - return deviceIdMin + randomGen.nextInt(range); - } - - private int genSoftware1Ver() - { - int range = software1Max - software1Min + 1; - return software1Min + randomGen.nextInt(range); - } - - private int genSoftware2Ver() - { - int range = software2Max - software2Min + 1; - return software2Min + randomGen.nextInt(range); - } - - private int genCpu(Calendar cal) - { - int minute = cal.get(Calendar.MINUTE); - int second; - int range = minute / 2 + 19; - if (minute / 17 == 0) { - second = cal.get(Calendar.SECOND); - return (30 + randomGen.nextInt(range) + (minute % 7) - (second % 11)); - } else if (minute / 47 == 0) { - second = cal.get(Calendar.SECOND); - return (7 + randomGen.nextInt(range) + (minute % 7) - (second % 7)); - } else { - second = cal.get(Calendar.SECOND); - return (randomGen.nextInt(range) + (minute % 19) + (second % 7)); - } - } - - private int genRam(Calendar cal) - { - int minute = cal.get(Calendar.MINUTE); - int second; - int range = minute + 1; - if (minute / 23 == 0) { - second = cal.get(Calendar.SECOND); - return (20 + randomGen.nextInt(range) + (minute % 5) - (second % 11)); - } else if (minute / 37 == 0) { - second = cal.get(Calendar.SECOND); - return (11 + randomGen.nextInt(60) - (minute % 5) - (second % 11)); - } else { - second = cal.get(Calendar.SECOND); - return (randomGen.nextInt(range) + (minute % 17) + (second % 11)); - } - } - - private int genHdd(Calendar cal) - { - int minute = cal.get(Calendar.MINUTE); - int second; - int range = minute / 2 + 1; - if (minute / 37 == 0) { - second = cal.get(Calendar.SECOND); - return (25 + randomGen.nextInt(range) - minute % 7 - second % 11); - } else { - second = cal.get(Calendar.SECOND); - return (randomGen.nextInt(range) + minute % 23 + second % 11); - } - } - - /** - * This method returns the minimum value for customer - * - * @return - */ - public int getCustomerMin() - { - return customerMin; - } - - /** - * This method is used to set the minimum value for customer - * - * @param customerMin the minimum customer value - */ - public void setCustomerMin(int customerMin) - { - this.customerMin = customerMin; - } - - /** - * This method returns the max value for customer - * - * @return - */ - public int getCustomerMax() - { - return customerMax; - } - - /** - * This method is used to set the max value for customer - * - * @param customerMax the max customer value - */ - public void setCustomerMax(int customerMax) - { - this.customerMax = customerMax; - } - - /** - * This method returns the minimum value for product - * - * @return - */ - public int getProductMin() - { - return productMin; - } - - /** - * This method is used to set the minimum value for product - * - * @param productMin the minimum product value - */ - public void setProductMin(int productMin) - { - this.productMin = productMin; - } - - /** - * This method returns the max value for product - * - * @return - */ - public int getProductMax() - { - return productMax; - } - - /** - * This method is used to set the max value for product - * - * @param productMax the max product value - */ - public void setProductMax(int productMax) - { - this.productMax = productMax; - } - - /** - * This method returns the minimum value for OS - * - * @return - */ - public int getOsMin() - { - return osMin; - } - - /** - * This method is used to set the minimum value for OS - * - * @param osMin the min OS value - */ - public void setOsMin(int osMin) - { - this.osMin = osMin; - } - - /** - * This method returns the max value for OS - * - * @return - */ - public int getOsMax() - { - return osMax; - } - - /** - * This method is used to set the max value for OS - * - * @param osMax the max OS value - */ - public void setOsMax(int osMax) - { - this.osMax = osMax; - } - - /** - * This method returns the minimum value for software1 - * - * @return - */ - public int getSoftware1Min() - { - return software1Min; - } - - /** - * This method is used to set the minimum value for software1 - * - * @param software1Min the minimum software1 value - */ - public void setSoftware1Min(int software1Min) - { - this.software1Min = software1Min; - } - - /** - * This method returns the max value for software1 - * - * @return - */ - public int getSoftware1Max() - { - return software1Max; - } - - /** - * This method is used to set the max value for software1 - * - * @param software1Max the max software1 value - */ - public void setSoftware1Max(int software1Max) - { - this.software1Max = software1Max; - } - - /** - * This method returns the minimum value for software2 - * - * @return - */ - public int getSoftware2Min() - { - return software2Min; - } - - /** - * This method is used to set the minimum value for software2 - * - * @param software2Min the minimum software2 value - */ - public void setSoftware2Min(int software2Min) - { - this.software2Min = software2Min; - } - - /** - * This method returns the max value for software2 - * - * @return - */ - public int getSoftware2Max() - { - return software2Max; - } - - /** - * This method is used to set the max value for software2 - * - * @param software2Max the max software2 value - */ - public void setSoftware2Max(int software2Max) - { - this.software2Max = software2Max; - } - - /** - * This method returns the minimum value for software3 - * - * @return - */ - public int getSoftware3Min() - { - return software3Min; - } - - /** - * This method is used to set the minimum value for software3 - * - * @param software3Min the minimum software3 value - */ - public void setSoftware3Min(int software3Min) - { - this.software3Min = software3Min; - } - - /** - * This method returns the max value for software3 - * - * @return - */ - public int getSoftware3Max() - { - return software3Max; - } - - /** - * This method is used to set the max value for software3 - * - * @param software3Max the max software3 value - */ - public void setSoftware3Max(int software3Max) - { - this.software3Max = software3Max; - } - - /** - * This method returns the minimum value for deviceId - * - * @return - */ - public int getDeviceIdMin() - { - return deviceIdMin; - } - - /** - * This method is used to set the minimum value for deviceId - * - * @param deviceIdMin the minimum deviceId value - */ - public void setDeviceIdMin(int deviceIdMin) - { - this.deviceIdMin = deviceIdMin; - } - - /** - * This method returns the max value for deviceId - * - * @return - */ - public int getDeviceIdMax() - { - return deviceIdMax; - } - - /** - * This method is used to set the max value for deviceId - * - * @param deviceIdMax the max deviceId value - */ - public void setDeviceIdMax(int deviceIdMax) - { - this.deviceIdMax = deviceIdMax; - } - - /** - * @return the tupleBlastSize - */ - public int getTupleBlastSize() - { - return tupleBlastSize; - } - - /** - * @param tupleBlastSize the tupleBlastSize to set - */ - public void setTupleBlastSize(int tupleBlastSize) - { - this.tupleBlastSize = tupleBlastSize; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java deleted file mode 100644 index 3c74cc5..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.data; - - -/** - * This class stores the value of sum and the count of values summed. - * <p> - * AverageData class. - * </p> - * - * @since 0.3.5 - */ -public class AverageData -{ - - private long cpu; - private long hdd; - private long ram; - private long count; - - /** - * This is default constructor that sets the sum and count to 0 - */ - public AverageData() - { - - } - - /** - * This constructor takes the value of sum and count and initialize the local attributes to corresponding values - * - * @param count - * the value of count - */ - public AverageData(long cpu,long hdd,long ram, long count) - { - this.cpu = cpu; - this.ram = ram; - this.hdd = hdd; - this.count = count; - } - - public long getCpu() - { - return cpu; - } - - public void setCpu(long cpu) - { - this.cpu = cpu; - } - - public long getHdd() - { - return hdd; - } - - public void setHdd(long hdd) - { - this.hdd = hdd; - } - - public long getRam() - { - return ram; - } - - public void setRam(long ram) - { - this.ram = ram; - } - - /** - * This returns the value of count - * @return - */ - public long getCount() - { - return count; - } - - /** - * This method sets the value of count - * @param count - */ - public void setCount(long count) - { - this.count = count; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java deleted file mode 100644 index 6f02a24..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.data; - -/** - * This class stores the cpu% usage, ram% usage, hdd% usage and key information about a particular machine - * <p> - * MachineInfo class. - * </p> - * - * @since 0.3.5 - */ -public class MachineInfo -{ - private MachineKey machineKey; - private int cpu; - private int ram; - private int hdd; - - /** - * This default constructor - */ - public MachineInfo() - { - } - - /** - * This constructor takes MachineKey as input and initialize local attributes - * - * @param machineKey - * the MachineKey instance - */ - public MachineInfo(MachineKey machineKey) - { - this.machineKey = machineKey; - } - - /** - * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes - * - * @param machineKey - * the MachineKey instance - * @param cpu - * the CPU% usage - * @param ram - * the RAM% usage - * @param hdd - * the HDD% usage - */ - public MachineInfo(MachineKey machineKey, int cpu, int ram, int hdd) - { - this.machineKey = machineKey; - this.cpu = cpu; - this.ram = ram; - this.hdd = hdd; - } - - /** - * This method returns the MachineKey - * - * @return - */ - public MachineKey getMachineKey() - { - return machineKey; - } - - /** - * This method sets the MachineKey - * - * @param machineKey - * the MachineKey instance - */ - public void setMachineKey(MachineKey machineKey) - { - this.machineKey = machineKey; - } - - /** - * This method returns the CPU% usage - * - * @return - */ - public int getCpu() - { - return cpu; - } - - /** - * This method sets the CPU% usage - * - * @param cpu - * the CPU% usage - */ - public void setCpu(int cpu) - { - this.cpu = cpu; - } - - /** - * This method returns the RAM% usage - * - * @return - */ - public int getRam() - { - return ram; - } - - /** - * This method sets the RAM% usage - * - * @param ram - * the RAM% usage - */ - public void setRam(int ram) - { - this.ram = ram; - } - - /** - * This method returns the HDD% usage - * - * @return - */ - public int getHdd() - { - return hdd; - } - - /** - * This method sets the HDD% usage - * - * @param hdd - * the HDD% usage - */ - public void setHdd(int hdd) - { - this.hdd = hdd; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java deleted file mode 100644 index 2b3bb1c..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java +++ /dev/null @@ -1,381 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.data; - -/** - * This class stores the information about various softwares, deviceIds, OS of the device - * <p> - * MachineKey class. - * </p> - * - * @since 0.3.5 - */ -public class MachineKey -{ - - private Integer customer; - private Integer product; - private Integer os; - private Integer software1; - private Integer software2; - private Integer software3; - private Integer deviceId; - private String timeKey; - private String day; - - /** - * This constructor takes the format in which time has to be captured and the day when this instance is created - * - * @param timeKey the format in which time has to be captured - * @param day the day when this instance is created - */ - public MachineKey(String timeKey, String day) - { - this.timeKey = timeKey; - this.day = day; - } - - /** - * This is default constructor - */ - public MachineKey() - { - } - - /** - * This constructor takes format in which time has to be captured, the day when this instance is created, the customer - * id, product Id on the device, OS version on the device, software1 version on the device, software2 version on the device, - * software3 version on the device, deviceId on the device, - * - * @param timeKey the format in which time has to be captured - * @param day the day when this instance is created - * @param customer the customer Id - * @param product product Id - * @param os OS version - * @param software1 software1 version - * @param software2 software2 version - * @param software3 software3 version - * @param deviceId deviceId - */ - public MachineKey(String timeKey, String day, Integer customer, Integer product, Integer os, Integer software1, Integer software2, Integer software3, Integer deviceId) - { - this.timeKey = timeKey; - this.day = day; - this.customer = customer; - this.product = product; - this.os = os; - this.software1 = software1; - this.software2 = software2; - this.software3 = software3; - this.deviceId = deviceId; - } - - /** - * This method returns the format in which the time is captured. The time is the time when this instance of MachineKey - * was generated. For e.g. HHmm to capture Hour and minute - * - * @return - */ - public String getTimeKey() - { - return timeKey; - } - - /** - * This method sets the format in which the time is captured. The time is the time when this instance of MachineKey - * was generated. For e.g. HHmm to capture Hour and minute - * - * @param timeKey - * the value of format - */ - public void setTimeKey(String timeKey) - { - this.timeKey = timeKey; - } - - /** - * This method returns the day of the month when this instance of MachineKey was generated - * - * @return - */ - public String getDay() - { - return day; - } - - /** - * This method sets the day of the month when this instance of MachineKey was generated - * - * @param day - * the day of the month - */ - public void setDay(String day) - { - this.day = day; - } - - /** - * This method returns the customer Id - * - * @return - */ - public Integer getCustomer() - { - return customer; - } - - /** - * This method sets the customer Id - * - * @param customer - * the customer Id - */ - public void setCustomer(Integer customer) - { - this.customer = customer; - } - - /** - * This method returns product on the device - * - * @return - */ - public Integer getProduct() - { - return product; - } - - /** - * This method sets the product on the device - * - * @param product - * the value of product - */ - public void setProduct(Integer product) - { - this.product = product; - } - - /** - * This method returns the OS version on the device - * - * @return - */ - public Integer getOs() - { - return os; - } - - /** - * This method sets the OS version on the device - * - * @param os - * OS version - */ - public void setOs(Integer os) - { - this.os = os; - } - - /** - * This method returns the version of the software1 on the device - * - * @return - */ - public Integer getSoftware1() - { - return software1; - } - - /** - * This method sets the version of the software1 on the device - * - * @param software1 the version of the software1 - */ - public void setSoftware1(Integer software1) - { - this.software1 = software1; - } - - /** - * This method returns the version of the software2 on the device - * - * @return - */ - public Integer getSoftware2() - { - return software2; - } - - /** - * This method sets the version of the software2 on the device - * - * @param software2 - * the version of the software2 - */ - public void setSoftware2(Integer software2) - { - this.software2 = software2; - } - - /** - * This method returns the version of the software3 on the device - * - * @return - */ - public Integer getSoftware3() - { - return software3; - } - - /** - * This method sets the version of the software3 on the device - * - * @param software3 - * the version of the software3 - */ - public void setSoftware3(Integer software3) - { - this.software3 = software3; - } - - @Override - public int hashCode() - { - int key = 0; - if (customer != null) { - key |= (1 << 31); - key ^= customer; - } - if (product != null) { - key |= (1 << 30); - key ^= product; - } - if (os != null) { - key |= (1 << 29); - key ^= os; - } - if (software1 != null) { - key |= (1 << 28); - key ^= software1; - } - if (software2 != null) { - key |= (1 << 27); - key ^= software2; - } - if (software3 != null) { - key |= (1 << 26); - key ^= software3; - } - if (deviceId != null) { - key |= (1 << 25); - key ^= deviceId; - } - if (timeKey != null) { - key |= (1 << 24); - key ^= timeKey.hashCode(); - } - if (day != null) { - key |= (1 << 23); - key ^= day.hashCode(); - } - - return key; - } - - @Override - public boolean equals(Object obj) - { - if (!(obj instanceof MachineKey)) { - return false; - } - MachineKey mkey = (MachineKey)obj; - return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId); - } - - private boolean checkIntEqual(Integer a, Integer b) - { - if ((a == null) && (b == null)) { - return true; - } - if ((a != null) && a.equals(b)) { - return true; - } - return false; - } - - private boolean checkStringEqual(String a, String b) - { - if ((a == null) && (b == null)) { - return true; - } - if ((a != null) && a.equals(b)) { - return true; - } - return false; - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(timeKey); - if (customer != null) { - sb.append("|0:").append(customer); - } - if (product != null) { - sb.append("|1:").append(product); - } - if (os != null) { - sb.append("|2:").append(os); - } - if (software1 != null) { - sb.append("|3:").append(software1); - } - if (software2 != null) { - sb.append("|4:").append(software2); - } - if (software3 != null) { - sb.append("|5:").append(software3); - } - if (deviceId != null) { - sb.append("|6:").append(deviceId); - } - return sb.toString(); - } - - /** - * This method returns the deviceId of the device - * @return The deviceId - */ - public Integer getDeviceId() - { - return deviceId; - } - - /** - * This method sets the deviceId of the device - * - * @param deviceId - */ - public void setDeviceId(Integer deviceId) - { - this.deviceId = deviceId; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java deleted file mode 100644 index d474c5c..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.data; - -import java.util.Map; - -import com.google.common.collect.Maps; - -/** - * This class captures the resources whose usage is collected for each device - * <p>ResourceType class.</p> - * - * @since 0.3.5 - */ -public enum ResourceType -{ - - CPU("cpu"), RAM("ram"), HDD("hdd"); - - private static Map<String, ResourceType> descToResource = Maps.newHashMap(); - - static { - for (ResourceType type : ResourceType.values()) { - descToResource.put(type.desc, type); - } - } - - private String desc; - - private ResourceType(String desc) - { - this.desc = desc; - } - - @Override - public String toString() - { - return desc; - } - - /** - * This method returns ResourceType for the given description - * @param desc the description - * @return - */ - public static ResourceType getResourceTypeOf(String desc) - { - return descToResource.get(desc); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java deleted file mode 100644 index 8f68dab..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.operator; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.demos.machinedata.data.ResourceType; -import com.datatorrent.demos.machinedata.util.DataTable; -import com.datatorrent.lib.codec.KryoSerializableStreamCodec; -import com.datatorrent.lib.util.KeyValPair; - -/** - * <p> - * CalculatorOperator class. - * </p> - * - * @since 0.3.5 - */ -public class CalculatorOperator extends BaseOperator -{ - - private final DataTable<MachineKey, ResourceType, List<Integer>> data = new DataTable<>(); - - @Min(1) - @Max(99) - private int kthPercentile = 95; // kth percentile - private boolean computePercentile; - private boolean computeSD; - private boolean computeMax; - - private int percentileThreshold = 80; - private int sdThreshold = 70; - private int maxThreshold = 99; - - public final transient DefaultInputPort<MachineInfo> dataPort = new DefaultInputPort<MachineInfo>() - { - @Override - public void process(MachineInfo tuple) - { - addDataToCache(tuple); - } - - /** - * Stream codec used for partitioning. - */ - @Override - public StreamCodec<MachineInfo> getStreamCodec() - { - return new MachineInfoStreamCodec(); - } - }; - - public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> percentileOutputPort = new DefaultOutputPort<>(); - - public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> sdOutputPort = new DefaultOutputPort<>(); - - public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Integer>>> maxOutputPort = new DefaultOutputPort<>(); - - public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>(); - - private void addDataToCache(MachineInfo tuple) - { - MachineKey machineKey = tuple.getMachineKey(); - if (!data.containsRow(machineKey)) { - data.put(machineKey, ResourceType.CPU, Lists.<Integer>newArrayList()); - data.put(machineKey, ResourceType.RAM, Lists.<Integer>newArrayList()); - data.put(machineKey, ResourceType.HDD, Lists.<Integer>newArrayList()); - } - data.get(machineKey, ResourceType.CPU).add(tuple.getCpu()); - data.get(machineKey, ResourceType.RAM).add(tuple.getRam()); - data.get(machineKey, ResourceType.HDD).add(tuple.getHdd()); - } - - @Override - public void endWindow() - { - - if (computePercentile) { - for (MachineKey machineKey : data.rowKeySet()) { - Collections.sort(data.get(machineKey, ResourceType.CPU)); - Collections.sort(data.get(machineKey, ResourceType.RAM)); - Collections.sort(data.get(machineKey, ResourceType.HDD)); - - Map<ResourceType, Double> percentileData = Maps.newHashMap(); - percentileData.put(ResourceType.CPU, getKthPercentile(data.get(machineKey, ResourceType.CPU))); - percentileData.put(ResourceType.RAM, getKthPercentile(data.get(machineKey, ResourceType.RAM))); - percentileData.put(ResourceType.HDD, getKthPercentile(data.get(machineKey, ResourceType.HDD))); - percentileOutputPort.emit(new KeyValPair<>(machineKey, percentileData)); - - for (ResourceType resourceType : percentileData.keySet()) { - double percentileValue = percentileData.get(resourceType); - if (percentileValue > percentileThreshold) { - emitAlert(resourceType, machineKey, percentileValue, "Percentile"); - } - } - } - } - if (computeSD) { - for (MachineKey machineKey : data.rowKeySet()) { - - Map<ResourceType, Double> sdData = Maps.newHashMap(); - - for (ResourceType resourceType : ResourceType.values()) { - sdData.put(resourceType, getSD(data.get(machineKey, resourceType))); - } - sdOutputPort.emit(new KeyValPair<>(machineKey, sdData)); - - for (ResourceType resourceType : sdData.keySet()) { - double sdValue = sdData.get(resourceType); - if (sdValue > sdThreshold) { - emitAlert(resourceType, machineKey, sdValue, "SD"); - } - } - } - } - if (computeMax) { - for (MachineKey machineKey : data.rowKeySet()) { - - Map<ResourceType, Integer> maxData = Maps.newHashMap(); - maxData.put(ResourceType.CPU, Collections.max(data.get(machineKey, ResourceType.CPU))); - maxData.put(ResourceType.RAM, Collections.max(data.get(machineKey, ResourceType.RAM))); - maxData.put(ResourceType.HDD, Collections.max(data.get(machineKey, ResourceType.HDD))); - - maxOutputPort.emit(new KeyValPair<>(machineKey, maxData)); - - for (ResourceType resourceType : maxData.keySet()) { - double sdValue = maxData.get(resourceType).doubleValue(); - if (sdValue > maxThreshold) { - emitAlert(resourceType, machineKey, sdValue, "Max"); - } - } - } - } - data.clear(); - } - - private void emitAlert(ResourceType type, MachineKey machineKey, double alertVal, String prefix) - { - BigDecimal decimalVal = new BigDecimal(alertVal); - decimalVal = decimalVal.setScale(2, BigDecimal.ROUND_HALF_UP); - String alertTime = machineKey.getDay() + machineKey.getTimeKey(); - smtpAlert.emit(prefix + "-" + type.toString().toUpperCase() + " alert at " + alertTime + " " + type + " usage breached current usage: " + decimalVal.doubleValue() + "% threshold: " + percentileThreshold + "%\n\n" + machineKey); - } - - private double getKthPercentile(List<Integer> sorted) - { - - double val = (kthPercentile * sorted.size()) / 100.0; - if (val == (int)val) { - // Whole number - int idx = (int)val - 1; - return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0; - } else { - int idx = (int)Math.round(val) - 1; - return sorted.get(idx); - } - } - - private double getSD(List<Integer> data) - { - int sum = 0; - for (int i : data) { - sum += i; - } - double avg = sum / (data.size() * 1.0); - double sd = 0; - for (Integer point : data) { - sd += Math.pow(point - avg, 2); - } - return Math.sqrt(sd); - } - - /** - * @param kVal the percentile which will be emitted by this operator - */ - public void setKthPercentile(int kVal) - { - this.kthPercentile = kVal; - } - - /** - * @param doCompute when true percentile will be computed - */ - public void setComputePercentile(boolean doCompute) - { - this.computePercentile = doCompute; - } - - /** - * @param doCompute when true standard deviation will be computed - */ - public void setComputeSD(boolean doCompute) - { - this.computeSD = doCompute; - } - - /** - * @param doCompute when true max will be computed - */ - public void setComputeMax(boolean doCompute) - { - this.computeMax = doCompute; - } - - /** - * @param threshold for percentile when breached will cause alert - */ - public void setPercentileThreshold(int threshold) - { - this.percentileThreshold = threshold; - } - - /** - * @param threshold for standard deviation when breached will cause alert - */ - public void setSDThreshold(int threshold) - { - this.sdThreshold = threshold; - } - - /** - * @param threshold for Max when breached will cause alert - */ - public void setMaxThreshold(int threshold) - { - this.maxThreshold = threshold; - } - - public static class MachineInfoStreamCodec extends KryoSerializableStreamCodec<MachineInfo> implements Serializable - { - public MachineInfoStreamCodec() - { - super(); - } - - @Override - public int getPartition(MachineInfo o) - { - return Objects.hashCode(o.getMachineKey().getCustomer(), o.getMachineKey().getOs(), o.getMachineKey().getProduct(), o.getMachineKey().getSoftware1(), o.getMachineKey().getSoftware2(), o.getMachineKey().getSoftware3()); - } - - private static final long serialVersionUID = 201411031403L; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java deleted file mode 100644 index bbfd547..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.operator; - -import java.math.BigDecimal; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import com.google.common.collect.Maps; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -import com.datatorrent.demos.machinedata.data.AverageData; -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.lib.util.KeyHashValPair; -import com.datatorrent.lib.util.KeyValPair; - -/** - * This class calculates the average for various resources across different devices for a given key - * <p>MachineInfoAveragingOperator class.</p> - * - * @since 0.9.0 - */ -@SuppressWarnings("unused") -public class MachineInfoAveragingOperator extends BaseOperator -{ - - public static final String CPU = "cpu"; - public static final String RAM = "ram"; - public static final String HDD = "hdd"; - public static final String DAY = "day"; - - private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>(); - - public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>(); - - public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>(); - - private int threshold = 95; - - /** - * Buffer all the tuples as is till end window gets called - */ - public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>() - { - - @Override - public void process(KeyHashValPair<MachineKey, AverageData> tuple) - { - addTuple(tuple); - } - }; - - /** - * This method returns the threshold value - * - * @return - */ - public int getThreshold() - { - return threshold; - } - - /** - * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent - * - * @param threshold the threshold value - */ - public void setThreshold(int threshold) - { - this.threshold = threshold; - } - - /** - * This adds the given tuple to the dataMap - * - * @param tuple input tuple - */ - private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple) - { - MachineKey key = tuple.getKey(); - dataMap.put(key, tuple.getValue()); - } - - @Override - public void endWindow() - { - for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) { - MachineKey key = entry.getKey(); - AverageData averageResultMap = entry.getValue(); - Map<String, String> averageResult = Maps.newHashMap(); - long count = averageResultMap.getCount(); - double average = averageResultMap.getCpu() / count; - averageResult.put(CPU, average + ""); - emitAlert(average, CPU, key); - average = averageResultMap.getHdd() / count; - averageResult.put(HDD, average + ""); - emitAlert(average, HDD, key); - average = averageResultMap.getRam() / count; - averageResult.put(RAM, average + ""); - emitAlert(average, RAM, key); - averageResult.put(DAY, key.getDay()); - outputPort.emit(new KeyValPair<>(key, averageResult)); - } - dataMap.clear(); - } - - private void emitAlert(double average, String resourceType, MachineKey key) - { - if (average > threshold) { - BigDecimal bd = new BigDecimal(average); - bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP); - String stime = key.getDay() + key.getTimeKey(); - String skey = getKeyInfo(key); - smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey); - } - } - - /** - * This method is used to artificially generate alerts - * - * @param genAlert - */ - public void setGenAlert(boolean genAlert) - { - Calendar calendar = Calendar.getInstance(); - long timestamp = System.currentTimeMillis(); - calendar.setTimeInMillis(timestamp); - DateFormat minuteDateFormat = new SimpleDateFormat("HHmm"); - Date date = calendar.getTime(); - String timeKey = minuteDateFormat.format(date); - DateFormat dayDateFormat = new SimpleDateFormat("dd"); - String day = dayDateFormat.format(date); - - MachineKey alertKey = new MachineKey(timeKey, day); - alertKey.setCustomer(1); - alertKey.setProduct(5); - alertKey.setOs(10); - alertKey.setSoftware1(12); - alertKey.setSoftware2(14); - alertKey.setSoftware3(6); - - MachineInfo machineInfo = new MachineInfo(); - machineInfo.setMachineKey(alertKey); - machineInfo.setCpu(threshold + 1); - machineInfo.setRam(threshold + 1); - machineInfo.setHdd(threshold + 1); - - smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey)); - smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey)); - smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey)); - } - - /** - * This method returns the String for a given MachineKey instance - * - * @param key MachineKey instance that needs to be converted to string - * @return - */ - private String getKeyInfo(MachineKey key) - { - StringBuilder sb = new StringBuilder(); - if (key instanceof MachineKey) { - MachineKey mkey = (MachineKey)key; - Integer customer = mkey.getCustomer(); - if (customer != null) { - sb.append("customer: " + customer + "\n"); - } - Integer product = mkey.getProduct(); - if (product != null) { - sb.append("product version: " + product + "\n"); - } - Integer os = mkey.getOs(); - if (os != null) { - sb.append("os version: " + os + "\n"); - } - Integer software1 = mkey.getSoftware1(); - if (software1 != null) { - sb.append("software1 version: " + software1 + "\n"); - } - Integer software2 = mkey.getSoftware2(); - if (software2 != null) { - sb.append("software2 version: " + software2 + "\n"); - } - Integer software3 = mkey.getSoftware3(); - if (software3 != null) { - sb.append("software3 version: " + software3 + "\n"); - } - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java deleted file mode 100644 index cb5fa5a..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.operator; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; - -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.machinedata.data.AverageData; -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * This class calculates the partial sum and count for tuples generated by upstream Operator - * <p> MachineInfoAveragingPrerequisitesOperator class. </p> - * - * @since 0.3.5 - */ -public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator -{ - - // Aggregate sum of all values seen for a key. - private Map<MachineKey, AverageData> sums = new HashMap<>(); - - public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>>() - { - @Override - public Unifier<KeyHashValPair<MachineKey, AverageData>> getUnifier() - { - MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier(); - return unifier; - } - }; - - public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() - { - - @Override - public void process(MachineInfo tuple) - { - MachineKey key = tuple.getMachineKey(); - AverageData averageData = sums.get(key); - if (averageData == null) { - averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1); - sums.put(key, averageData); - } else { - averageData.setCpu(averageData.getCpu() + tuple.getCpu()); - averageData.setRam(averageData.getRam() + tuple.getRam()); - averageData.setHdd(averageData.getHdd() + tuple.getHdd()); - averageData.setCount(averageData.getCount() + 1); - } - } - }; - - @Override - public void endWindow() - { - - for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) { - if (outputPort.isConnected()) { - outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue())); - } - } - sums.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java deleted file mode 100644 index e0b67f3..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.operator; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.Unifier; - -import com.datatorrent.demos.machinedata.data.AverageData; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * This class calculates the partial sum and count for a given key - * <p>MachineInfoAveragingUnifier class.</p> - * - * @since 0.9.0 - */ -public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<MachineKey, AverageData>> -{ - - private Map<MachineKey, AverageData> sums = new HashMap<>(); - public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<>(); - - @Override - public void beginWindow(long arg0) - { - // TODO Auto-generated method stub - - } - - @Override - public void endWindow() - { - for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) { - outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue())); - } - sums.clear(); - - } - - @Override - public void setup(OperatorContext arg0) - { - // TODO Auto-generated method stub - - } - - @Override - public void teardown() - { - // TODO Auto-generated method stub - - } - - @Override - public void process(KeyHashValPair<MachineKey, AverageData> arg0) - { - MachineKey tupleKey = arg0.getKey(); - AverageData averageData = sums.get(tupleKey); - AverageData tupleValue = arg0.getValue(); - if (averageData == null) { - sums.put(tupleKey, tupleValue); - } else { - averageData.setCpu(averageData.getCpu() + tupleValue.getCpu()); - averageData.setRam(averageData.getRam() + tupleValue.getRam()); - averageData.setHdd(averageData.getHdd() + tupleValue.getHdd()); - averageData.setCount(averageData.getCount() + tupleValue.getCount()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java deleted file mode 100644 index 6c4256a..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.util; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Generate combinations of elements for the given array of elements. - * - * Implements nCr = n! / (r! * (n-r)!) - * - * @since 0.3.5 - */ -public class Combinatorics<T> -{ - - private T[] values; - private int size = -1; - private List<T> result; - private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>(); - private int resultMapSize = 0; - - /** - * Generates all possible combinations with all the sizes. - * - * @param values - */ - public Combinatorics(T[] values) - { - this.values = values; - this.size = -1; - this.result = new ArrayList<>(); - } - - /** - * Generates all possible combinations with the given size. - * - * @param values - * @param size - */ - public Combinatorics(T[] values, int size) - { - this.values = values; - this.size = size; - this.result = new ArrayList<>(); - } - - public Map<Integer, List<T>> generate() - { - - if (size == -1) { - size = values.length; - for (int i = 1; i <= size; i++) { - int[] tmp = new int[i]; - Arrays.fill(tmp, -1); - generateCombinations(0, 0, tmp); - } - } else { - int[] tmp = new int[size]; - Arrays.fill(tmp, -1); - generateCombinations(0, 0, tmp); - } - return resultMap; - } - - public void generateCombinations(int start, int depth, int[] tmp) - { - if (depth == tmp.length) { - for (int j = 0; j < depth; j++) { - result.add(values[tmp[j]]); - } - resultMap.put(++resultMapSize, result); - result = new ArrayList<>(); - return; - } - for (int i = start; i < values.length; i++) { - tmp[depth] = i; - generateCombinations(i + 1, depth + 1, tmp); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java deleted file mode 100644 index f8f2d33..0000000 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata.util; - -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import com.google.common.collect.Maps; - -/** - * <p>DataTable class.</p> - * - * @since 0.3.5 - */ -public class DataTable<R,C,E> -{ - - //machineKey, [cpu,ram,hdd] -> value - private final Map<R,Map<C,E>> table = Maps.newHashMap(); - - public boolean containsRow(R rowKey) - { - return table.containsKey(rowKey); - } - - public void put(R rowKey,C colKey, E entry) - { - if (!containsRow(rowKey)) { - table.put(rowKey, Maps.<C,E>newHashMap()); - } - table.get(rowKey).put(colKey, entry); - } - - @Nullable - public E get(R rowKey, C colKey) - { - if (!containsRow(rowKey)) { - return null; - } - return table.get(rowKey).get(colKey); - } - - public Set<R> rowKeySet() - { - return table.keySet(); - } - - public void clear() - { - table.clear(); - } - - public Map<R,Map<C,E>> getTable() - { - return table; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/resources/META-INF/properties.xml b/demos/machinedata/src/main/resources/META-INF/properties.xml deleted file mode 100644 index afe8783..0000000 --- a/demos/machinedata/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,139 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> - <value>1000</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Receiver.attr.APPLICATION_WINDOW_COUNT - </name> - <value>5</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Receiver.attr.PARTITIONER</name> - <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.DimensionsGenerator.attr.APPLICATION_WINDOW_COUNT - </name> - <value>5</value> - </property> - <property> - <name>dt.application.MachineDataDemo.stream.Events.locality - </name> - <value>CONTAINER_LOCAL</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.DimensionsGenerator.inputport.inputPort.attr.PARTITION_PARALLEL - </name> - <value>true</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Aggregator.inputport.inputPort.attr.PARTITION_PARALLEL - </name> - <value>true</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Aggregator.attr.APPLICATION_WINDOW_COUNT - </name> - <value>5</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.AverageCalculator.attr.APPLICATION_WINDOW_COUNT - </name> - <value>5</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Persister.inputport.input.attr.PARTITION_PARALLEL - </name> - <value>true</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Persister.attr.APPLICATION_WINDOW_COUNT - </name> - <value>5</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Persister.store.dbIndex - </name> - <value>2</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Persister.store.host - </name> - <value></value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Persister.store.port - </name> - <value>6379</value> - </property> - <property> - <name>dt.application.MachineDataDemo.port.*.attr.QUEUE_CAPACITY</name> - <value>32000</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.from - </name> - <value></value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.subject - </name> - <value>Alert!!!</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.recipients.TO - </name> - <value></value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.content - </name> - <value>{}</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.smtpHost - </name> - <value>localhost</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.smtpPort - </name> - <value>25</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Alerter.useSsl - </name> - <value>false</value> - </property> - <property> - <name>dt.application.MachineDataDemo.operator.Aggregator.outputport.outputPort.attr.UNIFIER_LIMIT - </name> - <value>8</value> - </property> - <property> - <name>dt.application.MachineDataDemo.stream.DimensionalData.locality - </name> - <value>THREAD_LOCAL</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/site/conf/my-app-conf1.xml b/demos/machinedata/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/machinedata/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java deleted file mode 100644 index 0e397be..0000000 --- a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.machinedata; - -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableList; - -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.demos.machinedata.data.ResourceType; -import com.datatorrent.demos.machinedata.operator.CalculatorOperator; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.lib.util.TimeBucketKey; - -/** - * @since 0.3.5 - */ -public class CalculatorOperatorTest -{ - private static DateFormat minuteDateFormat = new SimpleDateFormat("HHmm"); - private static Logger LOG = LoggerFactory.getLogger(CalculatorOperatorTest.class); - - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - CalculatorOperator calculatorOperator = new CalculatorOperator(); - calculatorOperator.setup(null); - - calculatorOperator.setComputePercentile(true); - calculatorOperator.setComputeMax(true); - calculatorOperator.setComputeSD(true); - - testPercentile(calculatorOperator); - } - - public void testPercentile(CalculatorOperator oper) - { - - CollectorTestSink sortSink = new CollectorTestSink(); - oper.percentileOutputPort.setSink(sortSink); - oper.setKthPercentile(50); - Calendar calendar = Calendar.getInstance(); - Date date = calendar.getTime(); - String timeKey = minuteDateFormat.format(date); - String day = calendar.get(Calendar.DAY_OF_MONTH) + ""; - - Integer vs = new Integer(1); - MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs); - - oper.beginWindow(0); - - MachineInfo info = new MachineInfo(mk, 1, 1, 1); - oper.dataPort.process(info); - - info.setCpu(2); - oper.dataPort.process(info); - - info.setCpu(3); - oper.dataPort.process(info); - - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); - for (Object o : sortSink.collectedTuples) { - LOG.debug(o.toString()); - KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; - Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0); - Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0); - Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0); - - } - LOG.debug("Done percentile testing\n"); - - } - - public void testStandarDeviation(CalculatorOperator oper) - { - CollectorTestSink sortSink = new CollectorTestSink(); - oper.sdOutputPort.setSink(sortSink); - Calendar calendar = Calendar.getInstance(); - Date date = calendar.getTime(); - String timeKey = minuteDateFormat.format(date); - String day = calendar.get(Calendar.DAY_OF_MONTH) + ""; - - Integer vs = new Integer(1); - MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs); - - oper.beginWindow(0); - - MachineInfo info = new MachineInfo(mk, 1, 1, 1); - oper.dataPort.process(info); - - info.setCpu(2); - oper.dataPort.process(info); - - info.setCpu(3); - oper.dataPort.process(info); - - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); - for (Object o : sortSink.collectedTuples) { - LOG.debug(o.toString()); - KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; - Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0); - Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0); - Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0); - - } - LOG.debug("Done sd testing\n"); - - } - - private final double getSD(List<Integer> input) - { - int sum = 0; - for (int i : input) { - sum += i; - } - double avg = sum / (input.size() * 1.0); - double sd = 0; - for (Integer point : input) { - sd += Math.pow(point - avg, 2); - } - return Math.sqrt(sd); - } - - public void testMax(CalculatorOperator oper) - { - CollectorTestSink sortSink = new CollectorTestSink(); - oper.maxOutputPort.setSink(sortSink); - Calendar calendar = Calendar.getInstance(); - Date date = calendar.getTime(); - String timeKey = minuteDateFormat.format(date); - String day = calendar.get(Calendar.DAY_OF_MONTH) + ""; - - Integer vs = new Integer(1); - MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs); - - oper.beginWindow(0); - - MachineInfo info = new MachineInfo(mk, 1, 1, 1); - oper.dataPort.process(info); - - info.setCpu(2); - oper.dataPort.process(info); - - info.setCpu(3); - oper.dataPort.process(info); - - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); - for (Object o : sortSink.collectedTuples) { - LOG.debug(o.toString()); - KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; - Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0); - Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0); - Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0); - - } - LOG.debug("Done max testing\n"); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/test/resources/log4j.properties b/demos/machinedata/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/machinedata/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug
